Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] support reading config options from file in Function Python Runner #10

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def __init__(self,
pulsar_client,
secrets_provider,
cluster_name,
state_storage_serviceurl):
state_storage_serviceurl,
config_file):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
self.user_code = user_code
# set queue size to one since consumers already have internal queues. Just use queue to communicate message from
Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(self,
instance_id, cluster_name,
"%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)]
self.stats = Stats(self.metrics_labels)
self.config_file = config_file

def health_check(self):
self.last_health_check_ts = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,50 @@ def atexit_function(signo, _frame):
Log.info("Interrupted by %d, shutting down" % signo)
to_run = False

def merge_arguments(args, config_file):
"""
This function is used to merge arguments passed in via the command line
and those passed in via the configuration file during initialization.

:param args: arguments passed in via the command line
:param config_file: configuration file name (path)

During the merge process, the arguments passed in via the command line have higher priority,
so only optional arguments need to be merged.
"""
config = util.read_config(config_file)
if not config:
return
default_config = config["DEFAULT"]
if not default_config:
return
if not args.client_auth_plugin and default_config.get("client_auth_plugin", None):
args.client_auth_plugin = default_config.get("client_auth_plugin")
if not args.client_auth_params and default_config.get("client_auth_params", None):
args.client_auth_params = default_config.get("client_auth_params")
if not args.use_tls and default_config.get("use_tls", None):
args.use_tls = default_config.get("use_tls")
if not args.tls_allow_insecure_connection and default_config.get("tls_allow_insecure_connection", None):
args.tls_allow_insecure_connection = default_config.get("tls_allow_insecure_connection")
if not args.hostname_verification_enabled and default_config.get("hostname_verification_enabled", None):
args.hostname_verification_enabled = default_config.get("hostname_verification_enabled")
if not args.tls_trust_cert_path and default_config.get("tls_trust_cert_path", None):
args.tls_trust_cert_path = default_config.get("tls_trust_cert_path")
if not args.logging_level and default_config.get("logging_level", None):
args.logging_level = default_config.get("logging_level")
if not args.secrets_provider and default_config.get("secrets_provider", None):
args.secrets_provider = default_config.get("secrets_provider")
if not args.secrets_provider_config and default_config.get("secrets_provider_config", None):
args.secrets_provider_config = default_config.get("secrets_provider_config")
if not args.install_usercode_dependencies and default_config.get("install_usercode_dependencies", None):
args.install_usercode_dependencies = default_config.get("install_usercode_dependencies")
if not args.dependency_repository and default_config.get("dependency_repository", None):
args.dependency_repository = default_config.get("dependency_repository")
if not args.extra_dependency_repository and default_config.get("extra_dependency_repository", None):
args.extra_dependency_repository = default_config.get("extra_dependency_repository")
if not args.state_storage_serviceurl and default_config.get("state_storage_serviceurl", None):
args.state_storage_serviceurl = default_config.get("state_storage_serviceurl")

def main():
# Setup signal handlers
signal.signal(signal.SIGTERM, atexit_function)
Expand Down Expand Up @@ -88,8 +132,10 @@ def main():
parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from')
parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url')
parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on')
parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str)

args = parser.parse_args()
merge_arguments(args, args.config_file)
function_details = Function_pb2.FunctionDetails()
args.function_details = str(args.function_details)
if args.function_details[0] == '\'':
Expand Down Expand Up @@ -216,7 +262,8 @@ def main():
pulsar_client,
secrets_provider,
args.cluster_name,
state_storage_serviceurl)
state_storage_serviceurl,
args.config_file)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)

Expand Down
19 changes: 19 additions & 0 deletions pulsar-functions/instance/src/main/python/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import inspect
import sys
import importlib
import configparser

from threading import Timer
from pulsar.functions import serde

Expand Down Expand Up @@ -80,6 +82,23 @@ def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id):
def get_properties(fullyQualifiedName, instanceId):
return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)}

def read_config(config_file):
"""
The content of the configuration file is styled as follows:

[DEFAULT]
parameter1 = value1
parameter2 = value2
parameter3 = value3
...
"""
if config_file == "":
return None

cfg = configparser.ConfigParser()
cfg.read(config_file)
return cfg

class FixedTimer():

def __init__(self, t, hFunction, name="timer-thread"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,15 @@ public static void main(String[] args) throws Exception {
arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName;
}

for (String arg : arguments.topic) {
if (arg.startsWith("-")) {
System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', "
+ "please use a fully qualified topic name\n", arg, arg);
jc.usage();
PerfClientUtils.exit(-1);
}
}

if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) {
// keep compatibility with the previous version
if (arguments.topic.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ public static void main(String[] args) throws Exception {
arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName;
}

for (String arg : arguments.topics) {
if (arg.startsWith("-")) {
System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', "
+ "please use a fully qualified topic name\n", arg, arg);
jc.usage();
PerfClientUtils.exit(-1);
}
}

if (arguments.topics != null && arguments.topics.size() != arguments.numTopics) {
// keep compatibility with the previous version
if (arguments.topics.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ public static void main(String[] args) throws Exception {
PerfClientUtils.exit(-1);
}

for (String arg : arguments.topic) {
if (arg.startsWith("-")) {
System.out.printf("invalid option: '%s'\nTo use a topic with the name '%s', "
+ "please use a fully qualified topic name\n", arg, arg);
jc.usage();
PerfClientUtils.exit(-1);
}
}

if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) {
// keep compatibility with the previous version
if (arguments.topic.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public void replayComplete() {
+ tcID.toString() + " change state to Ready error when init it"));

} else {
completableFuture.complete(MLTransactionMetadataStore.this);
recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
completableFuture.complete(MLTransactionMetadataStore.this);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
Expand Down