Skip to content

Commit a7cb6c5

Browse files
committed
the_three_commits_in_april
1 parent b235b3a commit a7cb6c5

File tree

9 files changed

+204
-10
lines changed

9 files changed

+204
-10
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ requires-python = '>=3.9'
6868
'core.stash' = 'aiida.calculations.stash:StashCalculation'
6969
'core.templatereplacer' = 'aiida.calculations.templatereplacer:TemplatereplacerCalculation'
7070
'core.transfer' = 'aiida.calculations.transfer:TransferCalculation'
71+
'core.unstash' = 'aiida.calculations.unstash:UnstashCalculation'
7172

7273
[project.entry-points.'aiida.calculations.importers']
7374
'core.arithmetic.add' = 'aiida.calculations.importers.arithmetic.add:ArithmeticAddCalculationImporter'

src/aiida/calculations/stash.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class StashCalculation(CalcJob):
5454
5555
inputs = {
5656
'metadata': {
57-
'computer': Computer.collection.get(label="localhost"),
57+
'computer': load_computer(label="localhost"),
5858
'options': {
5959
'resources': {'num_machines': 1},
6060
'stash': {
@@ -64,7 +64,8 @@ class StashCalculation(CalcJob):
6464
},
6565
},
6666
},
67-
'source_node': node_1,
67+
'source_node': <RemoteData_NODE>,
68+
'code': <MY_CODE>
6869
}
6970
"""
7071

@@ -91,6 +92,7 @@ def define(cls, spec):
9192
'num_machines': 1,
9293
'num_mpiprocs_per_machine': 1,
9394
}
95+
9496
spec.inputs['metadata']['options']['input_filename'].default = 'aiida.in'
9597
spec.inputs['metadata']['options']['output_filename'].default = 'aiida.out'
9698

src/aiida/calculations/unstash.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
###########################################################################
2+
# Copyright (c), The AiiDA team. All rights reserved. #
3+
# This file is part of the AiiDA code. #
4+
# #
5+
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
6+
# For further information on the license, see the LICENSE.txt file #
7+
# For further information please visit http://www.aiida.net #
8+
###########################################################################
9+
""""""
10+
11+
from aiida import orm
12+
from aiida.common.datastructures import CalcInfo, CodeInfo, UnStashMode
13+
from aiida.engine import CalcJob
14+
15+
16+
class UnStashCalculation(CalcJob):
17+
"""
18+
Utility to unstash files from a remote folder.
19+
20+
An example of how the input should look like:
21+
22+
.. code-block:: python
23+
24+
inputs = {
25+
'metadata': {
26+
'computer': Computer.collection.get(label="localhost"),
27+
'options': {
28+
'resources': {'num_machines': 1},
29+
'unstash': {
30+
'unstash_mode': UnStashMode.OriginalPlace.value,
31+
'unstash_mode': UnStashMode.NewFolderData.value,
32+
},
33+
},
34+
},
35+
'source_node': node_1,
36+
'custom_command': 'rsync -av aiida.in _aiidasubmit.sh /scratch/my_stashing/',
37+
}
38+
"""
39+
40+
41+
42+
def __init__(self, *args, **kwargs):
43+
super().__init__(*args, **kwargs)
44+
45+
@classmethod
46+
def define(cls, spec):
47+
super().define(spec)
48+
49+
spec.input(
50+
'source_node',
51+
valid_type=(orm.RemoteStashCompressedData, orm.RemoteStashCustomData, orm.RemoteStashFolderData),
52+
required=True,
53+
help='',
54+
)
55+
56+
spec.inputs['metadata']['options']['input_filename'].default = 'aiida.in'
57+
spec.inputs['metadata']['options']['output_filename'].default = 'aiida.out'
58+
# Code is irrelevant for this calculation.
59+
spec.inputs.pop('code', None)
60+
61+
# Ideally one could use the same computer as the one of the `source_node`.
62+
# However, if another computer has access to the directory, we don't want to restrict.`
63+
spec.inputs['metadata']['computer'].required = True
64+
spec.inputs['metadata']['options']['resources'].default = {
65+
'num_machines': 1,
66+
'num_mpiprocs_per_machine': 1,
67+
}
68+
69+
def prepare_for_submission(self, folder):
70+
source_node = self.inputs.get('source_node')
71+
unstash_mode = self.inputs.metadata.options.stash.get('unstash_mode')
72+
73+
74+
calc_info = CalcInfo()
75+
76+
if isinstance(source_node, (orm.RemoteStashCompressedData, orm.RemoteStashFolderData)):
77+
calc_info.skip_submit = True
78+
79+
calc_info.codes_info = []
80+
calc_info.retrieve_list = []
81+
calc_info.local_copy_list = []
82+
calc_info.remote_copy_list = []
83+
calc_info.remote_symlink_list = []
84+
85+
elif isinstance(source_node, orm.RemoteStashCustomData):
86+
custom_command = self.inputs.metadata.options.stash.get('custom_command')
87+
88+
if custom_command is None:
89+
raise ValueError("Input 'custom_command' is required for `StashMode.CUSTOM_SCRIPT` mode.")
90+
91+
if unstash_mode == UnStashMode.OriginalPlace.value:
92+
src_of_the_src= orm.load_node(source_node.source_uuid)
93+
working_directory = src_of_the_src.get_remote_path() # type: ignore[union-attr]
94+
elif unstash_mode == UnStashMode.NewFolderData.value:
95+
# TODO:
96+
# Execmanager should make REmoteDAta
97+
# Generate a path, so the submitted job can put the data directly there.
98+
# Give that path to execmanager to generate and save a RemoteData node.
99+
100+
101+
102+
working_directory = source_node.get_remote_path() # type: ignore[union-attr]
103+
change_dir = f'cd {working_directory}\n'
104+
105+
with folder.open(self.options.input_filename, 'w', encoding='utf8') as handle:
106+
the_scripts = change_dir + '\n' + custom_command
107+
handle.write(the_scripts)
108+
109+
code_info = CodeInfo()
110+
code_info.stdin_name = self.options.input_filename
111+
code_info.stdout_name = self.options.output_filename
112+
113+
calc_info.codes_info = [code_info]
114+
calc_info.retrieve_list = [self.options.output_filename]
115+
calc_info.local_copy_list = []
116+
calc_info.remote_copy_list = []
117+
calc_info.remote_symlink_list = []
118+
119+
# The stashed node is going to be created by ``execmanager``, once the job is finished.
120+
121+
122+
123+
return calc_info

src/aiida/common/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
'StoringNotAllowed',
7979
'TestsNotAllowedError',
8080
'TransportTaskException',
81+
'UnStashMode',
8182
'UniquenessError',
8283
'UnsupportedSpeciesError',
8384
'ValidationError',

src/aiida/common/datastructures.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from .extendeddicts import DefaultFieldsAttributeDict
1717

18-
__all__ = ('CalcInfo', 'CalcJobState', 'CodeInfo', 'CodeRunMode', 'StashMode')
18+
__all__ = ('CalcInfo', 'CalcJobState', 'CodeInfo', 'CodeRunMode', 'StashMode', 'UnStashMode')
1919

2020

2121
class StashMode(Enum):
@@ -29,6 +29,13 @@ class StashMode(Enum):
2929
SUBMIT_CUSTOM_CODE = 'submit_custom_code'
3030

3131

32+
class UnStashMode(Enum):
33+
"""Mode to use when unstashing files."""
34+
35+
NewFolderData = 'new_folder_data'
36+
OriginalPlace = 'original_place'
37+
38+
3239
class CalcJobState(Enum):
3340
"""The sub state of a CalcJobNode while its Process is in an active state (i.e. Running or Waiting)."""
3441

src/aiida/engine/daemon/execmanager.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,30 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
491491

492492
###
493493

494+
if stash_mode == StashMode.SUBMIT_CUSTOM_CODE.value:
495+
if calculation.process_type != 'aiida.calculations:core.stash':
496+
EXEC_LOGGER.warning(
497+
f'Stashing as {StashMode.SUBMIT_CUSTOM_CODE.value}'
498+
' is only possible through job submission. Stashing skipped.'
499+
)
500+
# Note we could easily support it via `transport.exec_command_wait_async`
501+
# However, that may confuse users, as it's done in a different manner than job-submission
502+
# So just to stay safe, we decided not to provide this feature.
503+
return
504+
505+
remote_stash = RemoteStashCustomData(
506+
computer=calculation.computer,
507+
stash_mode=StashMode(stash_mode),
508+
target_basepath=str(target_base),
509+
source_list=source_list,
510+
)
511+
512+
remote_stash.store()
513+
remote_stash.base.links.add_incoming(calculation, link_type=LinkType.CREATE, link_label='remote_stash')
514+
return
515+
516+
###
517+
494518
if stash_mode == StashMode.COPY.value:
495519
target_basepath = target_base / uuid[:2] / uuid[2:4] / uuid[4:]
496520

src/aiida/engine/processes/calcjobs/calcjob.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,28 @@ def validate_stash_options(stash_options: Any, _: Any) -> Optional[str]:
121121
source_list = stash_options.get('source_list', None)
122122
stash_mode = stash_options.get('stash_mode', None)
123123

124-
if not isinstance(target_base, str) or not os.path.isabs(target_base):
125-
return f'`metadata.options.stash.target_base` should be an absolute filepath, got: {target_base}'
124+
if stash_mode is not StashMode.CUSTOM_SCRIPT.value:
125+
if not isinstance(target_base, str) or not os.path.isabs(target_base):
126+
return f'`metadata.options.stash.target_base` should be an absolute filepath, got: {target_base}'
126127

127-
if not isinstance(source_list, (list, tuple)) or any(
128-
not isinstance(src, str) or os.path.isabs(src) for src in source_list
129-
):
130-
port = 'metadata.options.stash.source_list'
131-
return f'`{port}` should be a list or tuple of relative filepaths, got: {source_list}'
128+
if not isinstance(source_list, (list, tuple)) or any(
129+
not isinstance(src, str) or os.path.isabs(src) for src in source_list
130+
):
131+
port = 'metadata.options.stash.source_list'
132+
return f'`{port}` should be a list or tuple of relative filepaths, got: {source_list}'
133+
134+
if stash_options.get('custom_command', None) is not None:
135+
return f'`metadata.options.stash.target_base` is not an input for {stash_mode} stashing mode'
136+
else:
137+
custom_command = stash_options.get('custom_command', None)
138+
if not isinstance(custom_command, str):
139+
return f'`metadata.options.stash.custom_command` should be a string, got: {custom_command}'
140+
141+
if stash_options.get('target_base', None) is not None:
142+
return '`metadata.options.stash.target_base` is not an input for `StashMode.CUSTOM_SCRIPT` stashing mode'
143+
144+
if stash_options.get('source_list', None) is not None:
145+
return '`metadata.options.stash.source_list` is not an input for `StashMode.CUSTOM_SCRIPT` stashing mode'
132146

133147
try:
134148
StashMode(stash_mode)
@@ -438,6 +452,12 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
438452
required=False,
439453
help='Whether to follow symlinks while stashing or not, specific to StashMode.COMPRESS_* enums',
440454
)
455+
spec.input(
456+
'metadata.options.stash.custom_command',
457+
valid_type=str,
458+
required=False,
459+
help='',
460+
)
441461
spec.output(
442462
'remote_folder',
443463
valid_type=orm.RemoteData,

src/aiida/engine/processes/calcjobs/tasks.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
UPDATE_COMMAND = 'update'
4545
RETRIEVE_COMMAND = 'retrieve'
4646
STASH_COMMAND = 'stash'
47+
UN_STASH_COMMAND = 'unstash'
4748
KILL_COMMAND = 'kill'
4849

4950
RETRY_INTERVAL_OPTION = 'transport.task_retry_initial_interval'
@@ -521,6 +522,12 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
521522
else:
522523
result = self.submit()
523524

525+
elif self._command == UN_STASH_COMMAND:
526+
if node.get_option('unstash') is not None:
527+
await self._launch_task(task_unstash_job, node, transport_queue)
528+
529+
result = self.retrieve(monitor_result=self._monitor_result)
530+
524531
elif self._command == SUBMIT_COMMAND:
525532
result = await self._launch_task(task_submit_job, node, transport_queue)
526533

@@ -555,6 +562,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
555562
elif self._command == STASH_COMMAND:
556563
if node.get_option('stash'):
557564
await self._launch_task(task_stash_job, node, transport_queue)
565+
558566
result = self.retrieve(monitor_result=self._monitor_result)
559567

560568
elif self._command == RETRIEVE_COMMAND:
@@ -667,6 +675,13 @@ def stash(self, monitor_result: CalcJobMonitorResult | None = None) -> 'Waiting'
667675
ProcessState.WAITING, None, msg=msg, data={'command': STASH_COMMAND, 'monitor_result': monitor_result}
668676
)
669677

678+
def unstash(self, monitor_result: CalcJobMonitorResult | None = None) -> 'Waiting':
679+
"""Return the `Waiting` state that will `unstash` the `CalcJob`."""
680+
msg = 'Waiting to unstash'
681+
return self.create_state( # type: ignore[return-value]
682+
ProcessState.WAITING, None, msg=msg, data={'command': UN_STASH_COMMAND, 'monitor_result': monitor_result}
683+
)
684+
670685
def retrieve(self, monitor_result: CalcJobMonitorResult | None = None) -> 'Waiting':
671686
"""Return the `Waiting` state that will `retrieve` the `CalcJob`."""
672687
msg = 'Waiting to retrieve'

tests/orm/test_fields/fields_aiida.data.core.remote.stash.custom.RemoteStashCustomData.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ stash_mode: QbField('stash_mode', dtype=<enum 'StashMode'>, is_attribute=True)
2020
target_basepath: QbStrField('target_basepath', dtype=<class 'str'>, is_attribute=True)
2121
user: QbNumericField('user', dtype=typing.Optional[int], is_attribute=False)
2222
uuid: QbStrField('uuid', dtype=typing.Optional[str], is_attribute=False)
23+

0 commit comments

Comments
 (0)