Skip to main content

Quickstart

Polars

install

pip install pyspine[polars]

Create context

from spinecore.common.context.polars_context import SpinePolarsContext

ctx = SpinePolarsContext.get_context(lazy=True, config={"set_tbl_rows": 20})

Create Workflow

from spinecore.workflows import DagWorkflow

workflow = workflow_factory()

Add Steps to workflow

from spinelibs.polars.steps.general.GUIDColumn import GUIDColumn
from spinelibs.polars.steps.io.csv.csv_reader import CsvReader
from spinelibs.polars.steps.io.csv.csv_writer import CSVWriter

def __setup_steps():
migrations_reader = CsvReader("../polars_exp/resources/migrations.csv", has_headers=True)
csv_writer = CSVWriter("./resources/output.csv")
guid_column = GUIDColumn(["company", "year"], guid_column_name="guid")
return migrations_reader, guid_column, csv_writer

def workflow_factory():
workflow = DagWorkflow("PolarsExample")
migrations_reader, guid_column, csv_writer = __setup_steps()
workflow.add_after([guid_column], [migrations_reader])
workflow.add_after([csv_writer], [guid_column])
return workflow

Execute Workflow

async def main():
ctx = SpinePolarsContext.get_context(lazy=True, config={"set_tbl_rows": 20})
workflow = workflow_factory()
await execute(ctx, workflow)


if __name__ == '__main__':
asyncio.run(main())