A lightweight Elixir project to keep and run reusable data pipelines.
- Apache-2.0 (
LICENSE) - Ethical use guidance:
ETHICAL_USE.md - Attribution notice:
NOTICE
- Elixir
~> 1.16 - Erlang/OTP compatible with your Elixir version
mix deps.get
mix testPre-push CI parity check:
./scripts/check_before_push.shARM64 macOS one-command setup:
./scripts/install_arm64_macos.shVersion manager sync files:
.tool-versions(asdf).mise.toml(mise)
- Workflow file:
.github/workflows/ci-cd.yml - CI runs on pull requests and pushes to
main(compile + test). - CD runs on tags matching
v*and creates a GitHub Release after tests pass.
Broadway: concurrent pipeline execution for row batches.NimbleCSV: robust CSV parsing (replaces manual split parsing).Oban: job worker support for scheduled/retryable pipeline runs.
- See
SECURITY.mdfor reporting and hardening notes. - Dynamic atom creation from user input is blocked:
- pipeline names are resolved against registry keys
- unknown JSON/CSV keys remain strings instead of becoming atoms
- In sandboxed sessions,
mixcommands may fail with:epermbecause Mix.PubSub opens a local TCP socket. - If that happens, rerun
mix deps.get/mix testwith elevated permissions in the session.
- Homebrew had a symlink conflict (
/opt/homebrew/bin/typer) during Erlang setup; fixed withbrew link --overwrite erlang. - In this environment,
mixcommands require elevated permissions due sandbox TCP socket restrictions.
Evidence from this environment:
uname -ashowedarm64.- Homebrew installed ARM bottles (for example
elixir--1.19.5.arm64_tahoeanderlang--28.4.arm64_tahoe).
Q: elixir -v fails with exec: erl: not found after install.
A: Erlang was not fully linked. Run brew link --overwrite erlang.
Q: mix deps.get or mix test fails with failed to open a TCP socket / :eperm.
A: In sandboxed sessions, rerun mix commands with elevated permissions.
Q: mix pipeline.run crashes with JSON encoding error for tuple reasons.
A: Fixed in lib/mix/tasks/pipeline.run.ex by serializing error reasons via inspect/1 before JSON encoding.
Q: Broadway producer compilation fails with missing :acknowledger on %Broadway.Message{}.
A: Fixed in lib/pipeline_bin/broadway/runner/producer.ex by setting acknowledger: {Broadway.NoopAcknowledger, nil, nil}.
Q: CSV pipeline runs return zero rows or treat the first data row as header after NimbleCSV migration.
A: Fixed in lib/pipeline_bin/csv.ex by correctly unwrapping {:ok, rows} in with and parsing stream with skip_headers: false.
Q: Is untrusted input allowed to create atoms at runtime?
A: No. String.to_atom/1 was removed from user-controlled paths to prevent atom table exhaustion.
Bug handling rule:
- When a bug is found and fixed, add an entry to this FAQ with the symptom and exact fix.
iex -S mixThen:
registry = PipelineBin.Pipelines.registry()
input = %{
external_id: "user-123",
name: " Ada Lovelace ",
email: " ADA@EXAMPLE.COM "
}
PipelineBin.Runner.run(registry, :user_ingest, input)
# => {:ok, %{external_id: "user-123", name: "Ada Lovelace", email: "ada@example.com", ingested_at: ...}}PipelineBin.Adapters.FileInput.run_file(
PipelineBin.Pipelines.registry(),
:order_ingest,
"data/orders.csv",
%{log_path: "var/pipeline_execution.log"}
)FileInput supports:
.json: object or array of objects.csv: header row + data rows
Run pipelines from terminal:
mix pipeline.run user_ingest data/users.json --no-persist-logs
mix pipeline.run order_ingest data/orders.csv --log-path var/pipeline_execution.logBroadway batch example from IEx:
rows = [
%{order_id: "ord-1", user_id: "usr-1", amount_cents: 2500, currency: "usd"},
%{order_id: "ord-2", user_id: "usr-2", amount_cents: 0, currency: "usd"}
]
PipelineBin.Broadway.Runner.run_batch(
PipelineBin.Pipelines.registry(),
:order_ingest,
rows,
%{persist_logs?: false}
)Oban worker enqueue example:
%{"pipeline" => "user_ingest", "path" => "data/users.json"}
|> PipelineBin.Workers.RunPipelineWorker.new()
|> Oban.insert()lib/pipeline_bin/pipeline.ex: Pipeline struct + execution enginelib/pipeline_bin/runner.ex: Named-pipeline registry runnerlib/pipeline_bin/pipelines.ex: Pipeline registrylib/pipeline_bin/pipelines/step_helpers.ex: Reusable step builderslib/pipeline_bin/pipelines/user_ingest.ex: Example production-style pipelinelib/pipeline_bin/pipelines/order_ingest.ex: Order ingestion pipelinelib/pipeline_bin/adapters/file_input.ex: JSON/CSV adapter for batch pipeline runslib/pipeline_bin/csv.ex: NimbleCSV parsing helperslib/pipeline_bin/broadway/runner.ex: Broadway concurrent batch runnerlib/pipeline_bin/broadway/runner/producer.ex: In-memory Broadway producerlib/pipeline_bin/workers/run_pipeline_worker.ex: Oban worker for file-based runslib/mix/tasks/pipeline.run.ex:mixtask to execute a pipeline over a filelib/pipeline_bin/execution_log.ex: Persistent step-level execution logslib/pipeline_bin.ex: Public convenience APISECURITY.md: Security reporting and hardening guidanceLICENSE: Apache-2.0 licenseETHICAL_USE.md: Ethical use policy for organizationsNOTICE: Apache-2.0 attribution notice.github/pull_request_template.md: PR checklist with ethics and security review gates.github/workflows/ci-cd.yml: GitHub Actions CI/CD pipeline.vscode/settings.json: Workspace editor defaults.vscode/extensions.json: Recommended VS Code extensions.vscode/tasks.json: VS Code task shortcuts for setup/test/pipeline runs.tool-versions: asdf version pins for Erlang/Elixir.mise.toml: mise version pins for Erlang/Elixir.codex/gpt-5.3/profile.json: GPT-5.3 session profile metadata.codex/gpt-5.3/session_prompt.md: GPT-5.3 reusable project promptscripts/install_arm64_macos.sh: Apple Silicon installer/bootstrap scriptscripts/check_before_push.sh: Local pre-push CI parity checks (workflow YAML + compile + tests)data/users.json: Sample user input datadata/orders.csv: Sample order input datatest/: ExUnit tests for core behavior