Skip to content

Commit

Permalink
add env variable
Browse files Browse the repository at this point in the history
  • Loading branch information
yuli_han authored and yuli_han committed Feb 28, 2024
1 parent ac5095b commit ec55e43
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions api/py/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def download_only_once(url, path):
@retry_decorator(retries=3, backoff=50)
def download_jar(version, jar_type="uber", release_tag=None, spark_version="2.4.0"):
assert (
spark_version in SUPPORTED_SPARK
spark_version in SUPPORTED_SPARK
), f"Received unsupported spark version {spark_version}. Supported spark versions are {SUPPORTED_SPARK}"
scala_version = SCALA_VERSION_FOR_SPARK[spark_version]
maven_url_prefix = os.environ.get("CHRONON_MAVEN_MIRROR_PREFIX", None)
Expand Down Expand Up @@ -237,6 +237,7 @@ def set_runtime_env(args):
- Environment variables derived from args (like app_name)
- conf.metaData.modeToEnvMap for the mode (set on config)
- team environment per context and mode set on teams.json
- production team environment per mode set on teams.json
- default team environment per context and mode set on teams.json
- Common Environment set in teams.json
"""
Expand All @@ -245,6 +246,7 @@ def set_runtime_env(args):
"conf_env": {},
"default_env": {},
"team_env": {},
"production_team_env ": {},
"cli_args": {},
}
conf_type = None
Expand All @@ -262,7 +264,7 @@ def set_runtime_env(args):
)
if args.conf and effective_mode:
try:
context, conf_type, team, _ = args.conf.split("/")[-4:]
_, conf_type, team, _ = args.conf.split("/")[-4:]
except Exception as e:
logging.error(
"Invalid conf path: {}, please ensure to supply the relative path to zipline/ folder".format(
Expand All @@ -272,6 +274,9 @@ def set_runtime_env(args):
raise e
if not team:
team = "default"
# context is the environment in which the job is running, which is provided from the args,
# default to be dev.
context = args.env
logging.info(
f"Context: {context} -- conf_type: {conf_type} -- team: {team}"
)
Expand All @@ -298,6 +303,11 @@ def set_runtime_env(args):
environment["team_env"] = (
teams_json[team].get(context, {}).get(effective_mode, {})
)
# If the job is running in dev environment but no dev environment is defined in teams.json,
# use production environment.
environment["production_team_env"] = (
teams_json[team].get("production", {}).get(effective_mode, {})
)
environment["default_env"] = (
teams_json.get("default", {})
.get(context, {})
Expand All @@ -314,10 +324,10 @@ def set_runtime_env(args):
[
k
for k in [
"chronon",
conf_type,
args.mode.replace("-", "_") if args.mode else None,
]
"chronon",
conf_type,
args.mode.replace("-", "_") if args.mode else None,
]
if k is not None
]
)
Expand All @@ -326,7 +336,7 @@ def set_runtime_env(args):
environment["cli_args"]["CHRONON_DRIVER_JAR"] = args.chronon_jar
environment["cli_args"]["CHRONON_ONLINE_JAR"] = args.online_jar
environment["cli_args"]["CHRONON_ONLINE_CLASS"] = args.online_class
order = ["conf_env", "team_env", "default_env", "common_env", "cli_args"]
order = ["conf_env", "team_env", "production_team_env", "default_env", "common_env", "cli_args"]
print("Setting env variables:")
for key in os.environ:
if any([key in environment[set_key] for set_key in order]):
Expand Down Expand Up @@ -367,7 +377,7 @@ def __init__(self, args, jar_path):
raise e
possible_modes = list(ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES
assert (
args.mode in possible_modes
args.mode in possible_modes
), "Invalid mode:{} for conf:{} of type:{}, please choose from {}".format(
args.mode, self.conf, self.conf_type, possible_modes
)
Expand Down Expand Up @@ -443,7 +453,7 @@ def run(self):
)
if self.mode == "streaming":
assert (
len(filtered_apps) == 1
len(filtered_apps) == 1
), "More than one found, please kill them all"
print("All good. No need to start a new app.")
return
Expand Down Expand Up @@ -568,6 +578,12 @@ def set_defaults(parser):
required=False,
help="Conf param - required for every mode except fetch",
)
parser.add_argument(
"--env",
required=False,
default='dev',
help="Running environment - default to be dev"
)
parser.add_argument("--mode", choices=MODE_ARGS.keys())
parser.add_argument("--ds", help="the end partition to backfill the data")
parser.add_argument(
Expand All @@ -589,7 +605,7 @@ def set_defaults(parser):
parser.add_argument(
"--online-jar",
help="Jar containing Online KvStore & Deserializer Impl. "
+ "Used for streaming and metadata-upload mode.",
+ "Used for streaming and metadata-upload mode.",
)
parser.add_argument(
"--online-class",
Expand All @@ -606,7 +622,7 @@ def set_defaults(parser):
parser.add_argument(
"--online-jar-fetch",
help="Path to script that can pull online jar. "
+ "This will run only when a file doesn't exist at location specified by online_jar",
+ "This will run only when a file doesn't exist at location specified by online_jar",
)
parser.add_argument(
"--sub-help",
Expand All @@ -631,7 +647,7 @@ def set_defaults(parser):
parser.add_argument(
"--render-info",
help="Path to script rendering additional information of the given config. "
+ "Only applicable when mode is set to info",
+ "Only applicable when mode is set to info",
)
set_defaults(parser)
pre_parse_args, _ = parser.parse_known_args()
Expand All @@ -652,4 +668,4 @@ def set_defaults(parser):
spark_version=os.environ.get("SPARK_VERSION", args.spark_version),
)
)
Runner(args, os.path.expanduser(jar_path)).run()
Runner(args, os.path.expanduser(jar_path)).run()

0 comments on commit ec55e43

Please sign in to comment.