At IMC, our interns don’t just shadow—they solve. Each project they take on is designed to make a tangible impact, tackling real challenges that solve real business problems.
At IMC, our interns don’t just shadow—they solve. Each project they take on is designed to make a tangible impact, tackling real challenges that solve real business problems.
This summer, as part of IMC’s 2024-25 Sydney internship program, Keats and Brendan, in collaboration with their mentor, Anthony, created GOOSE—a powerful tool to optimise data pipelines and improve computational efficiency.
This blog takes you through their five-week journey, from scoping the problem to building a fully functional optimisation tool that enhances research efficiency.
The brief
IMC develops a distributed data flow execution framework which is utilised by researchers to distribute heavy computations across multiple machines to be run in parallel.
The problem is that, for a desired data pipeline, it can be hard to determine how to partition tasks and which workers specs should be allocated for each task. As a result, these submitted data pipelines can often be slow and inefficient, wasting resources that could be used elsewhere. Previously, other developers had tried manually optimising the flows of researchers but this was found to be tedious and time-consuming.
Introducing GOOSE
Keats and Brendan’s solution to this problem was GOOSE (Graph ExecutiOn Optimisation ServicE), designed to systematically optimise these data pipelines when they are expressed in graph form. When represented as a graph, specific tasks become nodes and edges represent data dependencies between tasks as well as their communication through the network.
Week 1
The first step was to scope the problem. This involved discussing what had previously been observed with the manual optimisations performed by other developers on the team and familiarising ourselves with the distributed data flow execution framework itself.
In scoping the problem, we figured there must be two main components of GOOSE:
The first of these two components became what we called TESS (Task Execution Stats Service). Given a task name, TESS should return relevant statistics about said task which might include execution time, CPU costs as well as memory costs. Our initial goal for TESS was to provide historical averages of stats for tasks run in the distributed data flow execution framework with the intention that it could later do more complicated estimates of task stats, such as basing results on differences in parameters between runs.
The other component, named GEOS (Graph Execution Optimisation Service), would then be responsible for annotating an execution graph (provided by the user) with costs and then perform optimisations on the given graph before returning the result. The idea here is that the optimisation step should be highly configurable; the ultimate goal is to allow the client to choose from a number of different optimisations and compose them in different ways.
// GOOSE ARCHITECTURE DIAGRAM
We decided that these components should be microservices so they were independent (i.e. graphs could still be optimised, using a fall-back method, in GEOS even if TESS was down), don’t have to be written in Python and can be used with any graph based execution framework (i.e. the services aren’t coupled to previously mentioned data flow execution framework). Initially, these are just HTTP based RESTfulAPIs written in Java which could be modified to use GRPC or similar in another language (e.g. C++) if found to be a bottleneck.
With all this in mind, one of the major tasks of our first week was to define the APIs for these services, which meant we also needed to decide on the data model for the generic graph representations that GEOS would accept as input from the client to ensure it isn’t coupled to any one distributed data flow execution framework.
In terms of a Java class hierarchy, this consists of a GraphElement interface with Node and DirectedEdge classes that implement this interface. Additionally, there is then a Graph class that extends Node meaning that graphs can be composed of other graphs (i.e. subgraphs). This is necessary as, in the final optimised graph, we can’t lose any information about the flow of data through tasks, even if those tasks are combined into the same node.
Weeks 2-3
On the TESS side of things, we implemented an endpoint that allows for batched requests of statistics of tasks that have run in the distributed data flow execution framework. This was done through querying an existing PostgreSQL database that had tables for task execution information, memory measurements of tasks as well as CPU measurements for tasks ran in the distributed data flow execution framework. Therefore, TESS, given multiple runs of a task, returns the average total execution time as well as its peak/average memory and CPU usage statistics for a given task.
// TESS API example
With TESS operational, GEOS still needed a way to annotate the nodes and edges of the client-provided graph with costs. We decided that a Cost would be stored as an attribute on a GraphElement and provided a VectorisedCost implementation representing the different costs of a graph element as a vector. As it is not intuitive how this vector would be determined (e.g. using TESS versus using a constant value), a generic CostAnnotator interface was created where implementations would define this behaviour.
Now, after costs have been annotated and calculated, an implementation of the Optimiser interface (an interface containing a graph-to-graph method) can be applied to a graph. We focused on implementing two main optimisations in GEOS:
Linear fusion targets graphs with unnecessary communication—graphs that don’t benefit from splitting up their tasks over multiple workers. In cases where parallelism isn’t necessary (e.g. a linear chain), the communication costs can be
cut out completely and all tasks could be performed on the same worker. Thus, if a given graph is found to be less costly if ran on one worker, linear fusion will fuse the graph into a single node.
// Diagram portraying linear fusion being applied to an entire graph
Branch fusion, on the other hand, doesn’t aim to reduce any of the observable costs of a graph and instead targets the cost of scheduling. Given a number of completely independent tasks n and a number of available workers m, if n > m, then some subset of those tasks will have to be scheduled on the same worker. This scheduling occurs at runtime in the distributed data flow execution framework and can be particularly costly when there is a high number of tasks, even causing flows to fail in some cases. Branch fusion mitigates this by performing scheduling upfront and partitioning nodes into groups based on m, the number of available workers. Nodes in the same group are fused into a single node.
This partitioning of nodes is a configurable strategy used in branch fusion. Initially, we provide an approximate greedy strategy that takes the next biggest node and places it in the current smallest group. We use an approximation as an exact solution simplified to one that solves the K-partitioning problem, which is NP-hard.
// Diagram portraying branch fusion being applied to an entire graph
Some of the assumptions we made at this stage were that:
With these two types of optimisations implemented, we gave an intermediate demonstration to our team, deployed both GEOS and TESS and wrote unit tests for both components.
Week 4
From the initial demonstration, it was observed that TESS performed relatively poorly, with requests taking several minutes–even when just limited to task runs that occurred in the last few hours. This was the case due to a high volume of data and the lack of indexing on task names in the CPU and memory measurement tables. These tables were not originally designed for our specific querying needs.
Therefore, our solution was to create a new table that aggregated and pre-processed the majority of the information we were querying inside of TESS. The intention here is that this aggregated table would be updated periodically (e.g. daily) as a background task, enabling normal TESS requests to run much more efficiently by querying the pre-processed data instead.
In GEOS, our main focus was to address one of our assumptions from the previous weeks–that optimisations could only be performed on the whole graph. As a result, we introduced the idea of a Decomposer (a subtype of Optimiser), which is responsible for breaking the overall graph into subgraphs that could be optimised independently as if they were complete graphs. Different decomposers were often specialised for different optimisations:
// Linear fusion and branch decomposer diagram
With different configurations possible of decomposers and optimisers, we created the PipelineOptimiser which is composed of other Optimisers that execute sequentially before returning the final result. This allowed clients to configure and submit multiple optimisation pipelines, enabling GEOS to run them in parallel and return the best result. Additionally, GEOS could be configured to return ‘verbose’ output, displaying all intermediate optimisation steps for each pipeline.
Week 5
The main objective for week 5 was to tie up any loose ends and prepare for our final presentation.
While integrating GOOSE with the distributed data flow execution framework developed by IMC, we realised that our generic graph representation lacked a mechanism to preserve the order of task inputs. For example, given a task f(x, y), we needed to ensure that dependencies x and y were passed to f in the correct order in the optimised graph. This required refactoring our graph data model to introduce the concept of endpoints on nodes, which explicitly represented the ordering of inputs and outputs. If a node was a subgraph, it maintained a mapping of its endpoints to the corresponding internal nodes.
Another missing component in GEOS was a mechanism to “unpack” decomposed graphs after optimisation. Previously, decomposers split graphs into subgraphs, allowing optimisers to fuse nodes within those subgraphs. However, there was no step to integrate the resulting optimised subgraph back into its parent graph. The GraphUnpacker was introduced to handle this process.
We also added a CostComparator interface, enabling different strategies for comparing two cost-annotated graphs.
Additionally, we built a React-based visualiser for GEOS using the React Flow library. Given a set of pipelines and their verbose outputs, this visualiser displayed every graph returned by GEOS, with annotated costs and an interactive interface for traversing subgraphs.
// Image of frontend visualiser
Future Scope
When designing GOOSE, we prioritised modularity and configurability to support future extensions. Given the vast range of possible optimisation algorithms, there are many potential directions for future development.
With TESS, some additions could include:
GEOS features that could be added include:
This project is just one example of countless real-world projects that IMC interns work on. Seeing their work make a genuine impact on the business is the best way for our interns to learn and build on their skills.
Applications are now open for our Sydney internship program. Apply now for our Software Engineering Internship.