-
Notifications
You must be signed in to change notification settings - Fork 1
Home
SQLite Workflow Manager is a small sql-ish ETL tool to run parametrized queries.
Typical possible usage
- Load csv files from folder (or anything else) to SQLite database
- Make some sql transformations e.g. join, filter or pivot
- Export result to Access.
A workflow is a sequence of tasks to do. Each task is one of: SQL
, Var
, Router
and Sub
(flow). Links defines how a flow will be executing and don't have any implicit data.
On start Workflow manager collects tasks without input links, then executes them one by one, moves to next task by links and adds them into the collection.
Further gets any task from the collection (tasks with fewer input links has a higher priority), executes it and adds linked task to the collection. If there is no more task to execute then workflow is stopped. Due to this mechanism Workflow Manager allows to make cyclic graph (e.g. for a looping) unlike other tools.
A workflow requires SQLite database as a context to perform SQL queries. This database is selected on the workflow start.
This task allows to run SQL queries, execute shell commands, export/import data via ODBC and make a data transformation. You can perform multiply queries at once dividing statement by semicolon e.g drop table if exists t;select odbc_read('DSN=CSV', 'select * from animals.csv', 'animals')
. All operations available throught SQLite extensions (see below).
Set or update local worflow variables. Variable values are substituted in SQL statement before execution. If value is NULL
then empty row takes a place. To avoid unpredictable behaviour, use $
-prefix (uppercase applied automatically), e.g. $IS_DONE
, $DSN_XLS
.
The value or expression of variable should be a valid SQL query (select
can be omitted). If the query returns multiply rows then dotted variable will be defined. For example, The variable $ABC
and its expression select 123 a, 'Steve' person
defines two variables $ABC.A
(123) and $ABC.PERSON
(Steve).
Don't forget to use quotes for string values e.g.
'2020-11-02'
.
There are some internal variables. All of them start by $$
.
-
$$SID
- Session id. Can be used to create unique tablenames. -
$$NODE6.STATUS
- Executed earlier task status withid = 6
:DONE
orFAIL
. Used inRouter
-task. -
$$NODE6.DATA
- IfSQL
-task returns rows then it will be the first column of the first row. ForIf
-task it's a array with condition results e.g.[0, 1, 0]
. -
$$ARG1
,$$ARG2
, ... - arguments passed on a workflow start.
Global variables are defined in prefs.sqlite
> workflow_globals
and they are read only during a worflow run.
Example
Name | Value or condition |
---|---|
$LOOPER | $LOOPER + 1 |
$TBLNAME | '$$NODE6.DATA' |
$PERSON | select name, birthdate from persons where id = $$ARG1 |
$START_DATE | '$$ARG1' |
This a fork for flow execution. A router task can have multiply conditions defines multiply way to flow continue. Each condition should be a valid SQL query (select
can be omitted) and returns true
or false
.
Example
Name | Condition |
---|---|
Loop OK | $LOOPER > 3 |
Loop Next | $LOOPER <= 3 |
Load OK | '$$NODE6.STATUS' = 'DONE' and json_extract('$$NODE6.DATA','$.error') is null |
Load Fail | '$$NODE6.STATUS' = 'FAIL' |
Anyway | 1 |
Runs another workflow. Arguments should be set as a valid SQL query (select
can be omitted) e.g. '2020-10-01', 'Bill', 123
.
Passed arguments will be available in the subflow as $$ARG1
, $$ARG2
, ...
If the query returns multiply rows then subflow will be executed for each result row. It's a simplest way to make a loop.
Workflow manager uses SQLite extensions. There are four extensions to implement basic ETL-functionality.
This extension uses to export/import any external data.
-
odbc_read(connectionString, query, target)
Read data via ODBC from external source and write it to SQLite table. If the target table doesn't exist, it'll be created.
UseTABLES
as the query to obtain a table list.
select odbc_read('Driver={Microsoft Text Driver (*.txt; *.csv)};Dbq=D:/csv/', 'select * from animals.csv', 'animals')
-
odbc_write(query, connectionString, target)
Upload an query resultset from SQLite to external database. The target table must exists.
select odbc_write('select * from animals where family = "Felidae"', 'DSN=Zoo.mdb', 'cats')
-
odbc_query(connectionString, query)
Execute an query on external database e.g. to create or purge a target table.
select odbc_query('DSN=Zoo', 'create table cats (id integer, name varchar2(50))')
-
odbc_dsn()
Returns local DSN list as json array:{"result": ["MyData", "Csv", ...], ...}
select odbc_dsn()
Remarks
- A result of a function is a json:
{"result": "ok", ...}
on done and{"error": "<msg>"}
on error. - Use 32bit ODBC manager
C:\Windows\SysWOW64\odbcad32.exe
to define a local DSN. - You should install appropriate ODBC driver to gain access to MySQL, PosgreSQL, Mongo, etc.
Examples of connection strings (more)
Driver | String |
---|---|
Txt, csv | DSN=MyCSV |
Txt, csv | Driver={Microsoft Text Driver (*.txt; *.csv)};Dbq=c:/data/;Extensions=asc,csv,tab,txt; |
Excel | Driver={Microsoft Excel Driver (*.xls)};Dbq=C:/MyBook.xls;ReadOnly=0; |
Access | Driver={Microsoft Access Driver (*.mdb)};Dbq=C:/mydb.mdb;Uid=Admin;Pwd=; |
PosgreSQL | Driver={PostgreSQL};Server=192.168.0.100;Port=5432;Database=myDB;Uid=user1;Pwd=pass1; |
MySQL | Driver={MySQL ODBC 5.2 UNICODE Driver};Server=192.168.0.100;Database=myDB;User=user1;Password=pass1;Option=3; |
Provides data transformation operations.
pivot (query, target)
Calculate a pivot table and put the result into a target table. If the target table exists then it will be recreated.
First column is identical to first result column, values of second column used as result columns, values of third columns used as result values.
select pivot('select manager, product, qty from orders', 'mytable')
select pivot('select manager, strftime('01-%m-%Y', saledate) date, qty from orders', 'mytable')
unpivot (query, target)
Calculate a unpivot table and put the result into a target table. If the target table exists then it will be recreated.
First column is identical to first result column, all next columns transform to values of c1
-column, values of these columns put to c2
-column.
select unpivot('select manager, milk, eggs, bread from pivot_orders', 'mytable') --> manager, c1, c2
jsontable(json, target)
Transform json array to table. If the target table exists then it will be recreated.
select jsontable('["a", "b"]', 'mytable') --> create "mytable" with "c1" columns and two rows "a" and "b"
select jsontable('[{"a": 10, "b": 20}, {"a": 100, "b": 200}]', 'mytable') --> create and fill "mytable" with "a" and "b" columns
select jsontable('[["10", "20"], ["100", "200"]]', 'mytable') --> create and fill "mytable" with "c1" and "c2" columns
txttable(text, target)
Split text by lines and put result to target table. If the target table exists then it will be recreated.
select txttable('line1'|| x'0d' || x'0a' || 'line2'|| x'0a' || 'line3', 'mytable') --> create "mytable" with "c1" column and 3 rows: "line1", "line2" and "line3"
The special extension to interrupt a flow executing or SQL task.
ASSERT(condition, msg = NULL)
Throw error if the condition returns FALSE
select ASSERT(1 = 2, 'INEQUALITY')
This extension is used to run any external app and grab its output.
exec(cmd, codepage = 'UTF16')
Executes shell command and returns console output as result.
Codepage
defines code page of command output and is a one of: ANSI
, CP437
, UTF7
, UTF8
, UTF16
.
If cmd starts from powershell
and codepage
is empty then CP437
is used.
select exec('powershell -nologo "Get-Content C:/data.txt"')
select * from exec('powershell Get-Content C:/data.txt -Encoding UTF8', 'CP437')
PowerShell snippets
Task | Command |
---|---|
Read a file list | Get-ChildItem -Path D:/dir -Name |
Read a file | Get-Content D:/data.json -Encoding UTF8 |
Make a file | New-Item D:/file.txt -ItemType File |
Wait for a file | While (!(Test-Path D:/flag.txt -ErrorAction SilentlyContinue)){} |
Get data by url |
Invoke-RestMethod -Uri https://some-url/api/ (In Win7 you should update PS to v3) |
sqlite-wf <workflow-name> <database-path> <arg1> <arg2> ...
Workflow-name
and database-path
are mandatory arguments.
Use quotes to pass arguments with spaces e.g. "2020-03-12 11:00"
.
Passed arg1
, arg2
, ... will be available as $$ARG1
, $$ARG2
, ...