Skip to content

Commit 37f4dc2

Browse files
committed
fixed workflow step input
1 parent ae0507d commit 37f4dc2

6 files changed

Lines changed: 114 additions & 97 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
- fixed a bug with Dockerfile path resolution
99
- handle NetworkAccess Requirment in runner
1010
- inherit parents requirements correclty
11-
- ramping up runner conformance from 160/378 to 186/378
11+
- ramping up runner conformance from 160/378 to 188/378
1212

1313
# v0.5.2
1414
## 🐛 Bugfixes

crates/cwl-execution/src/runner.rs

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818
};
1919
use cwl::{
2020
clt::{Argument, Command, CommandLineTool},
21-
inputs::{CommandLineBinding, WorkflowStepInput},
21+
inputs::CommandLineBinding,
2222
requirements::{DockerRequirement, InlineJavascriptRequirement, StringOrInclude},
2323
types::{CWLType, DefaultValue, Directory, Entry, File, PathItem},
2424
wf::{StringOrDocument, Workflow},
@@ -75,44 +75,29 @@ pub fn run_workflow(
7575

7676
//map inputs to correct fields
7777
let mut step_inputs = HashMap::new();
78-
for (key, input) in &step.in_ {
79-
match input {
80-
WorkflowStepInput::String(in_string) => {
81-
let parts: Vec<&str> = in_string.split('/').collect();
82-
if parts.len() == 2 {
83-
if let Some(out_value) = outputs.get(in_string) {
84-
step_inputs.insert(key.to_string(), out_value.to_default_value());
85-
}
86-
} else if let Some(input) = workflow.inputs.iter().find(|i| i.id == *in_string) {
87-
let value = evaluate_input(input, &input_values.inputs)?;
88-
step_inputs.insert(key.to_string(), value.to_owned());
89-
}
78+
for parameter in &step.in_ {
79+
let source = parameter.source.as_deref().unwrap_or_default();
80+
let source_parts: Vec<&str> = source.split('/').collect();
81+
82+
//try output
83+
if source_parts.len() == 2 {
84+
if let Some(out_value) = outputs.get(source) {
85+
step_inputs.insert(parameter.id.to_string(), out_value.to_default_value());
86+
continue;
9087
}
91-
WorkflowStepInput::Parameter(parameter) => {
92-
let source = parameter.source.as_deref().unwrap_or_default();
93-
let source_parts: Vec<&str> = source.split('/').collect();
94-
95-
if source_parts.len() == 2 {
96-
//try output
97-
if let Some(out_value) = outputs.get(source) {
98-
step_inputs.insert(key.to_string(), out_value.to_default_value());
99-
continue;
100-
}
101-
}
102-
103-
//try default
104-
if let Some(default) = &parameter.default {
105-
step_inputs.entry(key.to_string()).or_insert(default.to_owned());
106-
}
88+
}
89+
//try default
90+
if let Some(default) = &parameter.default {
91+
step_inputs.entry(parameter.id.to_string()).or_insert(default.to_owned());
92+
}
10793

108-
if let Some(input) = workflow.inputs.iter().find(|i| i.id == *source) {
109-
let value = evaluate_input(input, &input_values.inputs)?;
110-
match value {
111-
DefaultValue::Any(val) if val.is_null() => continue,
112-
_ => {
113-
step_inputs.insert(key.to_string(), value.to_owned());
114-
}
115-
}
94+
//try input
95+
if let Some(input) = workflow.inputs.iter().find(|i| i.id == *source) {
96+
let value = evaluate_input(input, &input_values.inputs)?;
97+
match value {
98+
DefaultValue::Any(val) if val.is_null() => continue,
99+
_ => {
100+
step_inputs.insert(parameter.id.to_string(), value.to_owned());
116101
}
117102
}
118103
}
@@ -244,7 +229,7 @@ pub fn run_tool(
244229

245230
//build runtime object
246231
let mut runtime = RuntimeEnvironment::initialize(tool, &input_values, dir.path(), tool_path, tmp_dir.path())?;
247-
232+
248233
//replace inputs and runtime placeholders in tool with the actual values
249234
set_placeholder_values(tool, &runtime, &mut input_values);
250235
runtime.environment = collect_environment(&input_values);

crates/cwl/src/inputs.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -121,28 +121,34 @@ where
121121
Ok(parameters)
122122
}
123123

124-
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
125-
#[serde(untagged)]
126-
pub enum WorkflowStepInput {
127-
String(String),
128-
Parameter(Box<WorkflowStepInputParameter>),
129-
}
130-
131-
impl Default for WorkflowStepInput {
132-
fn default() -> Self {
133-
WorkflowStepInput::String(String::default())
134-
}
135-
}
136-
137124
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
138125
#[serde(rename_all = "camelCase")]
139126
pub struct WorkflowStepInputParameter {
127+
#[serde(default)]
128+
pub id: String,
140129
#[serde(skip_serializing_if = "Option::is_none")]
141130
pub source: Option<String>,
142131
#[serde(skip_serializing_if = "Option::is_none")]
143132
pub default: Option<DefaultValue>,
144133
}
145134

135+
impl WorkflowStepInputParameter {
136+
pub fn with_id(mut self, id: impl ToString) -> Self {
137+
self.id = id.to_string();
138+
self
139+
}
140+
141+
pub fn with_source(mut self, source: impl ToString) -> Self {
142+
self.source = Some(source.to_string());
143+
self
144+
}
145+
146+
pub fn with_default(mut self, f: DefaultValue) -> Self {
147+
self.default = Some(f);
148+
self
149+
}
150+
}
151+
146152
#[cfg(test)]
147153
mod tests {
148154
use super::*;
@@ -154,10 +160,4 @@ mod tests {
154160
input.set_id("test".to_string());
155161
assert_eq!(input.id(), "test");
156162
}
157-
158-
#[test]
159-
pub fn test_workflow_step_input_default() {
160-
let input = WorkflowStepInput::default();
161-
assert_eq!(input, WorkflowStepInput::String(String::default()));
162-
}
163163
}

crates/cwl/src/wf.rs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use super::{
22
deserialize::{deserialize_list, Identifiable},
3-
inputs::WorkflowStepInput,
3+
inputs::WorkflowStepInputParameter,
44
outputs::WorkflowOutputParameter,
55
requirements::{deserialize_hints, deserialize_requirements, Requirement},
66
CWLDocument, DocumentBase,
77
};
8-
use serde::{Deserialize, Serialize};
8+
use serde::{Deserialize, Deserializer, Serialize};
9+
use serde_yaml::Value;
910
use std::{
1011
collections::{HashMap, VecDeque},
1112
ops::{Deref, DerefMut},
@@ -79,15 +80,9 @@ impl Workflow {
7980

8081
/// Checks whether the `Workflow` has a `WorkflowStep` with an input of id `id`
8182
pub fn has_step_input(&self, id: &str) -> bool {
82-
self.steps.iter().any(|step| {
83-
step.in_.clone().into_values().any(|val| {
84-
let src = match val {
85-
WorkflowStepInput::String(str) => str,
86-
WorkflowStepInput::Parameter(par) => par.source.unwrap_or_default(),
87-
};
88-
src == id
89-
})
90-
})
83+
self.steps
84+
.iter()
85+
.any(|step| step.in_.iter().any(|val| val.source == Some(id.to_string())))
9186
}
9287

9388
/// Checks whether the `Workflow` has a `WorkflowStep` with an ouput of id `id`
@@ -117,16 +112,11 @@ impl Workflow {
117112
for step in &self.steps {
118113
in_degree.entry(step.id.clone()).or_insert(0);
119114

120-
for input in step.in_.values() {
121-
let parts: Vec<&str> = match input {
122-
WorkflowStepInput::String(string) => string.split('/').collect(),
123-
WorkflowStepInput::Parameter(parameter) => {
124-
if let Some(source) = &parameter.source {
125-
source.split('/').collect()
126-
} else {
127-
vec![]
128-
}
129-
}
115+
for input in &step.in_ {
116+
let parts: Vec<&str> = if let Some(source) = &input.source {
117+
source.split('/').collect()
118+
} else {
119+
vec![]
130120
};
131121

132122
if parts.len() == 2 {
@@ -167,8 +157,10 @@ impl Workflow {
167157
pub struct WorkflowStep {
168158
#[serde(default)]
169159
pub id: String,
170-
pub run: StringOrDocument,
171-
pub in_: HashMap<String, WorkflowStepInput>,
160+
pub run: StringOrDocument,
161+
#[serde(deserialize_with = "deserialize_workflow_inputs")]
162+
pub in_: Vec<WorkflowStepInputParameter>,
163+
//pub in_: HashMap<String, WorkflowStepInput>,
172164
pub out: Vec<String>,
173165

174166
#[serde(skip_serializing_if = "Vec::is_empty")]
@@ -191,6 +183,41 @@ impl Identifiable for WorkflowStep {
191183
}
192184
}
193185

186+
pub fn deserialize_workflow_inputs<'de, D>(deserializer: D) -> Result<Vec<WorkflowStepInputParameter>, D::Error>
187+
where
188+
D: Deserializer<'de>,
189+
{
190+
let value: Value = Deserialize::deserialize(deserializer)?;
191+
192+
let parameters = match value {
193+
Value::Sequence(seq) => seq
194+
.into_iter()
195+
.map(|item| {
196+
let param: WorkflowStepInputParameter = serde_yaml::from_value(item).map_err(serde::de::Error::custom)?;
197+
Ok(param)
198+
})
199+
.collect::<Result<Vec<_>, _>>()?,
200+
Value::Mapping(map) => map
201+
.into_iter()
202+
.map(|(key, value)| {
203+
let id = key.as_str().ok_or_else(|| serde::de::Error::custom("Expected string key"))?;
204+
let param = if let Value::String(source_str) = value {
205+
WorkflowStepInputParameter::default().with_id(id).with_source(source_str)
206+
} else {
207+
let mut param: WorkflowStepInputParameter = serde_yaml::from_value(value).map_err(serde::de::Error::custom)?;
208+
param.id = id.to_string();
209+
param
210+
};
211+
212+
Ok(param)
213+
})
214+
.collect::<Result<Vec<_>, _>>()?,
215+
_ => return Err(serde::de::Error::custom("Expected sequence or mapping for inputs")),
216+
};
217+
218+
Ok(parameters)
219+
}
220+
194221
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
195222
#[serde(untagged)]
196223
pub enum StringOrDocument {

src/commands/workflow.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use clap::{Args, Subcommand};
77
use colored::Colorize;
88
use cwl::{
99
format::format_cwl,
10-
inputs::WorkflowStepInput,
1110
load_tool, load_workflow,
1211
wf::{StringOrDocument, Workflow},
1312
CWLDocument,
@@ -220,7 +219,7 @@ pub fn get_workflow_status(args: &CreateWorkflowArgs) -> Result<(), Box<dyn Erro
220219
.inputs
221220
.iter()
222221
.map(|input| {
223-
if step.in_.contains_key(&input.id) {
222+
if step.in_.iter().any(|i| i.id == input.id) {
224223
format!("✅ {}", input.id)
225224
} else if input.default.is_some() {
226225
format!("🔘 {}", input.id)
@@ -235,15 +234,11 @@ pub fn get_workflow_status(args: &CreateWorkflowArgs) -> Result<(), Box<dyn Erro
235234
.outputs
236235
.iter()
237236
.map(|output| {
238-
if workflow.steps.iter().any(|s| {
239-
s.in_.clone().into_values().any(|v| {
240-
let src = match v {
241-
WorkflowStepInput::String(str) => str,
242-
WorkflowStepInput::Parameter(par) => par.source.unwrap_or_default(),
243-
};
244-
src == format!("{}/{}", step.id, output.id)
245-
})
246-
}) || workflow.outputs.iter().any(|o| o.output_source == format!("{}/{}", step.id, output.id))
237+
if workflow
238+
.steps
239+
.iter()
240+
.any(|s| s.in_.clone().iter().any(|v| v.source == Some(format!("{}/{}", step.id, output.id))))
241+
|| workflow.outputs.iter().any(|o| o.output_source == format!("{}/{}", step.id, output.id))
247242
{
248243
format!("✅ {}", output.id)
249244
} else {

src/cwl.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::io::{get_workflows_folder, resolve_path};
22
use cwl::{
33
clt::CommandLineTool,
4-
inputs::{CommandInputParameter, WorkflowStepInput},
4+
inputs::{CommandInputParameter, WorkflowStepInputParameter},
55
load_tool,
66
outputs::WorkflowOutputParameter,
77
requirements::{Requirement, WorkDirItem},
@@ -98,7 +98,11 @@ impl Connectable for Workflow {
9898
.find(|step| step.id == to_parts[0])
9999
.unwrap()
100100
.in_
101-
.insert(to_parts[1].to_string(), WorkflowStepInput::String(from_input.to_owned()));
101+
.push(WorkflowStepInputParameter {
102+
id: to_parts[1].to_string(),
103+
source: Some(from_input.to_owned()),
104+
..Default::default()
105+
});
102106

103107
info!("➕ Added or updated connection from inputs.{from_input} to {to} in workflow");
104108

@@ -160,7 +164,11 @@ impl Connectable for Workflow {
160164
}
161165

162166
let step = self.steps.iter_mut().find(|s| s.id == to_parts[0]).unwrap(); //safe here!
163-
step.in_.insert(to_parts[1].to_string(), WorkflowStepInput::String(from.to_string()));
167+
step.in_.push(WorkflowStepInputParameter {
168+
id: to_parts[1].to_string(),
169+
source: Some(from.to_string()),
170+
..Default::default()
171+
});
164172

165173
Ok(())
166174
}
@@ -182,7 +190,8 @@ impl Connectable for Workflow {
182190
// If the step is found, try to remove the connection by removing input from `tool_y` that uses output of `tool_x`
183191
//Input is empty, change that?
184192
if let Some(step) = step {
185-
if step.in_.remove(to_parts[1]).is_some() {
193+
if step.in_.iter().any(|v| v.id == to_parts[1]) {
194+
step.in_.retain(|v| v.id != to_parts[1]);
186195
info!("🔗 Successfully disconnected {from} from {to}");
187196
} else {
188197
warn!("No connection found between {from} and {to}. Nothing to disconnect.");
@@ -203,7 +212,8 @@ impl Connectable for Workflow {
203212
self.inputs.remove(index);
204213
}
205214
if let Some(step) = self.steps.iter_mut().find(|s| s.id == to_parts[0]) {
206-
if step.in_.remove(to_parts[1]).is_some() {
215+
if step.in_.iter().any(|v| v.id == to_parts[1]) {
216+
step.in_.retain(|v| v.id != to_parts[1]);
207217
info!("➖ Successfully disconnected input {from_input} from {to}");
208218
} else {
209219
warn!("No input connection found for {from_input} to disconnect.");

0 commit comments

Comments
 (0)