Quickstart
Spark
install
pip install pyspine[spark]
Create context
from spinecore.common.context.spark_context import SpineSparkContext
ctx = SpineSparkContext.get_context(spark_config={"spark.executor.memory": "1gb"}) # Example config
Create Workflow
from spinecore.workflows import DagWorkflow
workflow = workflow_factory()
Add Steps to workflow
from spinelibs.spark.steps.io.console.console_writer import ConsoleWriter
from spinelibs.spark.steps.io.csv.csv_reader import CsvReader
def __setup_steps():
example_reader = CsvReader("./resources/exp1.csv", has_headers=True)
example_reader2 = CsvReader("./resources/exp2.csv", has_headers=True)
console_output = ConsoleWriter()
return example_reader, example_reader2 ,console_output
def workflow_factory():
workflow = DagWorkflow("SparkExample")
example_reader, example_reader2 ,console_output= __setup_steps()
workflow.add_after([console_output], [example_reader])
workflow.add_after([console_output], [example_reader2])
return workflow
Execute Workflow
async def main():
ctx = SpineSparkContext.get_context(spark_config={"spark.executor.memory": "1gb"}) # Example config
workflow = workflow_factory()
await execute(ctx, workflow)
if __name__ == '__main__':
asyncio.run(main())