Skip to content

Commit

Permalink
Fix a fd leak issue (#575)
Browse files Browse the repository at this point in the history
Signed-off-by: Konstantinos Kallas <[email protected]>
  • Loading branch information
angelhof authored Jun 18, 2022
1 parent d1d9ca6 commit f736068
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 10 deletions.
4 changes: 2 additions & 2 deletions compiler/ast_to_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,15 +863,15 @@ def preprocess_node_case(ast_node, irFileGen, config, last_object=False):
## If we are need to disable parallel pipelines, e.g., if we are in the context of an if,
## or if we are in the end of a script, then we set a variable.
def replace_df_region(asts, irFileGen, config, disable_parallel_pipelines=False, ast_text=None):
_, ir_filename = ptempfile()
ir_filename = ptempfile()

## Serialize the node in a file
with open(ir_filename, "wb") as ir_file:
pickle.dump(asts, ir_file)

## Serialize the candidate df_region asts back to shell
## so that the sequential script can be run in parallel to the compilation.
_, sequential_script_file_name = ptempfile()
sequential_script_file_name = ptempfile()
## If we don't have the original ast text, we need to unparse the ast
if (ast_text is None):
kv_asts = [ast_node_to_untyped_deep(ast) for ast in asts]
Expand Down
4 changes: 2 additions & 2 deletions compiler/dspash/ir_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def save_configs(graph:IR, dfs_configs_paths: Dict[HDFSFileConfig, str]):
resource : DFSSplitResource = edge.get_resource()
config: HDFSFileConfig = resource.config
if config not in dfs_configs_paths:
_, config_path = ptempfile()
config_path = ptempfile()
with open(config_path, "w") as f:
f.write(config)
dfs_configs_paths[config] = config_path
Expand All @@ -57,7 +57,7 @@ def save_configs(graph:IR, dfs_configs_paths: Dict[HDFSFileConfig, str]):
resource.set_config_path(config_path)

def to_shell_file(graph: IR, args) -> str:
_, filename = ptempfile()
filename = ptempfile()

dirs = set()
for edge in graph.all_fids():
Expand Down
4 changes: 2 additions & 2 deletions compiler/pash.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def preprocess_and_execute_asts(ast_objects, args, input_script_arguments, shell
preprocessed_shell_script = preprocess_ast(ast_objects, args)

## Write the new shell script to a file to execute
_, fname = ptempfile()
fname = ptempfile()
log("Preprocessed script stored in:", fname)
with open(fname, 'w') as new_shell_file:
new_shell_file.write(preprocessed_shell_script)
Expand Down Expand Up @@ -190,7 +190,7 @@ def parse_args():
shell_name = "pash"

if args.command is not None:
_, fname = ptempfile()
fname = ptempfile()
with open(fname, 'w') as f:
f.write(args.command)
## If the shell is invoked with -c and arguments after it, then these arguments
Expand Down
4 changes: 2 additions & 2 deletions compiler/pash_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def compile_optimize_output_script(ir_filename, compiled_script_file, args, comp
## which should be translated to a parallel script.
if(isinstance(optimized_ast_or_ir, IR)):
if args.distributed_exec:
_, ir_filename = ptempfile()
ir_filename = ptempfile()
script_to_execute = f"$PASH_TOP/compiler/dspash/remote_exec_graph.sh {ir_filename}\n"
## This might not be needed anymore (since the output script is output anyway)
## TODO: This is probably useless, remove
Expand Down Expand Up @@ -381,7 +381,7 @@ def split_hdfs_cat_input(hdfs_cat, next_node, graph, fileIdGen):

# Create a cat command per file block
file_config = hdfs_utils.get_file_config(hdfs_filepath)
_, dummy_config_path = ptempfile() # Dummy config file, should be updated by workers
dummy_config_path = ptempfile() # Dummy config file, should be updated by workers
for split_num, block in enumerate(file_config.blocks):
resource = DFSSplitResource(file_config.dumps(), dummy_config_path, split_num, block.hosts)
block_fid = fileIdGen.next_file_id()
Expand Down
2 changes: 1 addition & 1 deletion compiler/pash_runtime_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def init_bash_mirror_subprocess():
echo=False)
## If we are in debug mode also log the bash's output
if (config.pash_args.debug >= 1):
_, file_to_save_output = ptempfile()
file_to_save_output = ptempfile()
log("bash mirror log saved in:", file_to_save_output)
fout = open(file_to_save_output, "w")
p.logfile = fout
Expand Down
6 changes: 5 additions & 1 deletion compiler/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import timedelta
import os
import sys
import config
import tempfile
Expand Down Expand Up @@ -40,4 +41,7 @@ def log(*args, end='\n', level=1):
print(config.LOGGING_PREFIX, *args, file=f, end=end, flush=True)

def ptempfile():
return tempfile.mkstemp(dir=config.PASH_TMP_PREFIX)
fd, name = tempfile.mkstemp(dir=config.PASH_TMP_PREFIX)
## TODO: Get a name without opening the fd too if possible
os.close(fd)
return name

0 comments on commit f736068

Please sign in to comment.