diff --git a/bindings/python/bergamot.cpp b/bindings/python/bergamot.cpp index 474bc0d51..00b2e3a77 100644 --- a/bindings/python/bergamot.cpp +++ b/bindings/python/bergamot.cpp @@ -1,3 +1,4 @@ +// #define PYBIND11_DETAILED_ERROR_MESSAGES // Enables debugging #include #include #include @@ -29,6 +30,7 @@ using Alignment = std::vector>; using Alignments = std::vector; PYBIND11_MAKE_OPAQUE(std::vector); +PYBIND11_MAKE_OPAQUE(std::vector); PYBIND11_MAKE_OPAQUE(std::vector); PYBIND11_MAKE_OPAQUE(std::unordered_map); PYBIND11_MAKE_OPAQUE(Alignments); @@ -212,11 +214,13 @@ PYBIND11_MODULE(_bergamot, m) { .def("pivot", &ServicePyAdapter::pivot) .def("setTerminology", &ServicePyAdapter::setTerminology); + py::bind_vector>(m, "VectorSizeT"); py::class_(m, "ServiceConfig") - .def(py::init<>([](size_t numWorkers, size_t cacheSize, std::string logging, std::string pathToTerminologyFile, + .def(py::init<>([](size_t numWorkers, std::vector gpuWorkers, size_t cacheSize, std::string logging, std::string pathToTerminologyFile, bool terminologyForce, std::string terminologyForm) { Service::Config config; config.numWorkers = numWorkers; + config.gpuWorkers = gpuWorkers; config.cacheSize = cacheSize; config.logger.level = logging; config.terminologyFile = pathToTerminologyFile; @@ -224,10 +228,12 @@ PYBIND11_MODULE(_bergamot, m) { config.format = terminologyForm; return config; }), - py::arg("numWorkers") = 1, py::arg("cacheSize") = 0, py::arg("logLevel") = "off", + py::arg("numWorkers") = 1, py::arg("gpuWorkers") = std::vector{0}, + py::arg("cacheSize") = 0, py::arg("logLevel") = "off", py::arg("pathToTerminologyFile") = "", py::arg("terminologyForce") = false, py::arg("terminologyForm") = "%s %s ") .def_readwrite("numWorkers", &Service::Config::numWorkers) + .def_readwrite("gpuWorkers", &Service::Config::gpuWorkers) .def_readwrite("cacheSize", &Service::Config::cacheSize) .def_readwrite("pathToTerminologyFile", &Service::Config::terminologyFile) .def_readwrite("terminologyForce", &Service::Config::terminologyForce) diff --git a/bindings/python/translator.py b/bindings/python/translator.py index b5297dba7..75f0233ce 100755 --- a/bindings/python/translator.py +++ b/bindings/python/translator.py @@ -9,6 +9,7 @@ class Translator: Attributes: _num_workers Number of parallel CPU workers. + _gpu_workers Indices of the GPU devices used. _num_workers must be set to zero! _cache: Cache size. 0 to disable cache. _logging: Log level: trace, debug, info, warn, err(or), critical, off. Default is off _terminology: Path to a TSV terminology file @@ -21,6 +22,7 @@ class Translator: _service The translation service """ _num_workers: int + _gpu_workers: List[int] _cache: int _logging: str _terminology: str @@ -32,26 +34,28 @@ class Translator: _responseOpts: bergamot.ResponseOptions _service: bergamot.Service - def __init__(self, model_conifg_path: str, num_workers: int=1, cache: int=0, \ + def __init__(self, model_conifg_path: str, num_workers: int=1, gpu_workers: List[int]=[], cache: int=0, \ logging="off", terminology: str="", force_terminology: bool=False,\ terminology_form: str="%s %s "): """Initialises the translator class :param model_conifg_path: Path to the configuration file for the translation model. :param num_workers: Number of CPU workers. + :param gpu_workers: Indices of the GPU devices. num_workers must be zero if this is non-empty :param cache: cache size. 0 means no cache. :param logging: Log level: trace, debug, info, warn, err(or), critical, off. :param terminology: Path to terminology file, TSV format :param force_terminology: Force terminology to appear on the target side. May impact translation quality. """ self._num_workers = num_workers + self._gpu_workers = gpu_workers self._cache = cache self._logging = logging self._terminology = terminology self._force_terminology = force_terminology self._terminology_form = terminology_form - self._config = bergamot.ServiceConfig(self._num_workers, self._cache, self._logging, self._terminology, self._force_terminology, self._terminology_form) + self._config = bergamot.ServiceConfig(self._num_workers, bergamot.VectorSizeT(self._gpu_workers), self._cache, self._logging, self._terminology, self._force_terminology, self._terminology_form) self._service = bergamot.Service(self._config) self._responseOpts = bergamot.ResponseOptions() # Default false for all, if we want to enable HTML later, from here self._model = self._service.modelFromConfigPath(model_conifg_path) @@ -64,7 +68,7 @@ def reset_terminology(self, terminology: str="", force_terminology: bool=False) """ self._terminology = terminology self._force_terminology = force_terminology - self._config = bergamot.ServiceConfig(self._num_workers, self._cache, self._logging, self._terminology, self._force_terminology, self._terminology_form) + self._config = bergamot.ServiceConfig(self._num_workers, bergamot.VectorSizeT(self._gpu_workers), self._cache, self._logging, self._terminology, self._force_terminology, self._terminology_form) self._service = bergamot.Service(self._config) def reset_terminology(self, terminology: Dict[str,str], force_terminology: bool=False) -> None: @@ -81,7 +85,16 @@ def reset_num_workers(self, num_workers) -> None: :return: None """ self._num_workers = num_workers - self._config = bergamot.ServiceConfig(self._num_workers, self._cache, self._logging, self._terminology, self._force_terminology, self._terminology_form) + self._config = bergamot.ServiceConfig(self._num_workers, bergamot.VectorSizeT(self._gpu_workers), self._cache, self._logging, self._terminology, self._force_terminology, self._terminology_form) + self._service = bergamot.Service(self._config) + + def reset_gpu_workers(self, gpu_workers: List[int]) -> None: + """Resets the number of GPU workers + :param gpu_workers: Indices of the GPU devices to be used. + :return: None + """ + self._gpu_workers = gpu_workers + self._config = bergamot.ServiceConfig(self._num_workers, bergamot.VectorSizeT(self._gpu_workers), self._cache, self._logging, self._terminology, self._force_terminology, self._terminology_form) self._service = bergamot.Service(self._config) def translate(self, sentences: List[str]) -> List[str]: @@ -98,6 +111,7 @@ def main(): parser = argparse.ArgumentParser(description="bergamot-translator interface") parser.add_argument("--config", '-c', required=True, type=str, help='Model YML configuration input.') parser.add_argument("--num-workers", '-n', type=int, default=1, help='Number of CPU workers.') + parser.add_argument("--num-gpus", "-g", type=int, action='append', nargs='+', default=None, help='List of GPUs to use.') parser.add_argument("--logging", '-l', type=str, default="off", help='Set verbosity level of logging: trace, debug, info, warn, err(or), critical, off. Default is off') parser.add_argument("--cache-size", type=int, default=0, help='Cache size. 0 for caching is disabled') parser.add_argument("--terminology-tsv", '-t', default="", type=str, help='Path to a terminology file TSV') @@ -107,7 +121,14 @@ def main(): parser.add_argument("--batch", '-b', default=32, type=int, help="Number of lines to process in a batch") args = parser.parse_args() - translator = Translator(args.config, args.num_workers, args.cache_size, args.logging, args.terminology_tsv, args.force_terminology, args.terminology_form) + if args.num_gpus is None: + num_gpus = [] + else: + num_gpus = args.num_gpus[0] + print(num_gpus) + print(type(num_gpus)) + print(args.num_workers) + translator = Translator(args.config, args.num_workers, num_gpus, args.cache_size, args.logging, args.terminology_tsv, args.force_terminology, args.terminology_form) if args.path_to_input is None: infile = stdin diff --git a/src/translator/service.cpp b/src/translator/service.cpp index b8f008810..0d1936ab8 100644 --- a/src/translator/service.cpp +++ b/src/translator/service.cpp @@ -158,8 +158,16 @@ AsyncService::AsyncService(const AsyncService::Config &config) safeBatchingPool_(), cache_(makeOptionalCache(config_.cacheSize, /*mutexBuckets=*/config_.numWorkers)), logger_(config.logger) { - ABORT_IF(config_.numWorkers == 0, "Number of workers should be at least 1 in a threaded workflow"); - workers_.reserve(config_.numWorkers); + if (config_.gpuWorkers.size() != 0) { + ABORT_IF(config_.numWorkers != 0, "Unable to mix GPU and CPU workers."); + workers_.reserve(config_.gpuWorkers.size()); + // VERY VERY HACKY. EVERYTHING USES NUM_WORKERS AS A REFERENCE FOR THE NUMBER OF WORKERS, + // REFACTOR TO USE gpuWorkers directly... + config_.numWorkers = config_.gpuWorkers.size(); + } else { + ABORT_IF(config_.numWorkers == 0, "Number of workers should be at least 1 in a threaded workflow"); + workers_.reserve(config_.numWorkers); + } // Initiate terminology map if present if (!config_.terminologyFile.empty()) { // Create an input filestream @@ -283,7 +291,6 @@ void AsyncService::translate(std::shared_ptr translationModel, html->restore(response); callback(std::move(response)); }; - translateRaw(translationModel, std::move(source), internalCallback, responseOptions); } diff --git a/src/translator/service.h b/src/translator/service.h index 1058ae595..d100c3c65 100644 --- a/src/translator/service.h +++ b/src/translator/service.h @@ -109,6 +109,7 @@ class BlockingService { class AsyncService { public: struct Config { + std::vector gpuWorkers; ///< GPU workers array. If not-empty use CPU workers instead. size_t numWorkers{1}; ///< How many worker translation threads to spawn. size_t cacheSize{0}; ///< Size in History items to be stored in the cache. Loosely corresponds to sentences to /// cache in the real world. A value of 0 means no caching. @@ -120,6 +121,7 @@ class AsyncService { template static void addOptions(App &app, Config &config) { app.add_option("--cpu-threads", config.numWorkers, "Workers to form translation backend"); + app.add_option("--gpu-workers", config.gpuWorkers, "GPU workers for the translation backend."); app.add_option("--cache-size", config.cacheSize, "Number of entries to store in cache."); app.add_option("--terminology-file", config.terminologyFile, "tsv, one term at a time terminology file."); app.add_option( @@ -138,7 +140,7 @@ class AsyncService { /// backend needed based on worker threads set. See TranslationModel for documentation on other params. Ptr createCompatibleModel(const TranslationModel::Config &config) { // @TODO: Remove this remove this dependency/coupling. - return New(config, /*replicas=*/config_.numWorkers); + return New(config, /*replicas=*/config_.numWorkers, config_.gpuWorkers); } /// With the supplied TranslationModel, translate an input. A Response is constructed with optional items set/unset diff --git a/src/translator/translation_model.cpp b/src/translator/translation_model.cpp index 6f8dd4dc8..538a3073c 100644 --- a/src/translator/translation_model.cpp +++ b/src/translator/translation_model.cpp @@ -16,13 +16,14 @@ namespace bergamot { std::atomic TranslationModel::modelCounter_ = 0; TranslationModel::TranslationModel(const Config &options, MemoryBundle &&memory /*=MemoryBundle{}*/, - size_t replicas /*=1*/) + size_t replicas /*=1*/, std::vector gpus /*={}*/) : modelId_(modelCounter_++), options_(options), memory_(std::move(memory)), vocabs_(options, std::move(memory_.vocabs)), textProcessor_(options, vocabs_, std::move(memory_.ssplitPrefixFile)), batchingPool_(options), + gpus_{gpus}, qualityEstimator_(createQualityEstimator(getQualityEstimatorModel(memory, options))) { ABORT_IF(replicas == 0, "At least one replica needs to be created."); backend_.resize(replicas); @@ -53,7 +54,13 @@ void TranslationModel::loadBackend(size_t idx) { auto &graph = backend_[idx].graph; auto &scorerEnsemble = backend_[idx].scorerEnsemble; - marian::DeviceId device_(idx, DeviceType::cpu); + marian::DeviceId device_; + + if (gpus_.empty()) { + device_ = marian::DeviceId(idx, DeviceType::cpu); + } else { + device_ = marian::DeviceId(gpus_[idx], DeviceType::gpu); + } graph = New(/*inference=*/true); // set the graph to be inference only auto prec = options_->get>("precision", {"float32"}); graph->setDefaultElementType(typeFromString(prec[0])); diff --git a/src/translator/translation_model.h b/src/translator/translation_model.h index 53980b4e9..7e411eed1 100644 --- a/src/translator/translation_model.h +++ b/src/translator/translation_model.h @@ -47,8 +47,8 @@ class TranslationModel { /// operandi. /// /// TODO(@jerinphilip): Clean this up. - TranslationModel(const std::string& config, MemoryBundle&& memory, size_t replicas = 1) - : TranslationModel(parseOptionsFromString(config, /*validate=*/false), std::move(memory), replicas){}; + TranslationModel(const std::string& config, MemoryBundle&& memory, size_t replicas = 1, std::vector gpus = {}) + : TranslationModel(parseOptionsFromString(config, /*validate=*/false), std::move(memory), replicas, gpus){}; /// Construct TranslationModel from marian-options. If memory is empty, TranslationModel is initialized from /// paths available in the options object, backed by filesystem. Otherwise, TranslationModel is initialized from the @@ -57,10 +57,11 @@ class TranslationModel { /// @param [in] options: Marian options object. /// @param [in] memory: MemoryBundle object holding memory buffers containing parameters to build MarianBackend, /// ShortlistGenerator, Vocabs and SentenceSplitter. - TranslationModel(const Config& options, MemoryBundle&& memory, size_t replicas = 1); + /// @param [in] gpus: Optional array of GPU ids + TranslationModel(const Config& options, MemoryBundle&& memory, size_t replicas = 1, std::vector gpus = {}); - TranslationModel(const Config& options, size_t replicas = 1) - : TranslationModel(options, getMemoryBundleFromConfig(options), replicas) {} + TranslationModel(const Config& options, size_t replicas = 1, std::vector gpus = {}) + : TranslationModel(options, getMemoryBundleFromConfig(options), replicas, gpus) {} /// Make a Request to be translated by this TranslationModel instance. /// @param [in] requestId: Unique identifier associated with this request, available from Service. @@ -103,6 +104,7 @@ class TranslationModel { MemoryBundle memory_; Vocabs vocabs_; TextProcessor textProcessor_; + std::vector gpus_; /// Maintains sentences from multiple requests bucketed by length and sorted by priority in each bucket. BatchingPool batchingPool_;