Выгрузить данные из radacct, исключая битые строки, можно с помощью скрипта radacct_exporter.py
Для его запуска необходимо установить зависимости из requirements.txt
Их можно установить следующим образом:
sudo apt install python3-pip pip3 install -r requirements.txt |
Скрипт radacct_exporter.py имеет следующие аргументы командной строки:
Скрипт выгружает данные в папку, из которой запущен скрипт.
Выгрузка представляет собой архивы с именем соответствующим дате, за которую были выгружены данные.
Таким образом можно выгружать данные частями (например, по месяцам или неделям) в зависимости от того, сколько места на жестком диске есть в наличии.
Пример запуска скрипта:
python3 radacct_exporter.py --start_date 2020-01-01 --stop_date 2020-02-01 --host mysql_slave_host |
Так будут выгружены данные с 01.01.2020 по 31.01.2020 включительно.
Перед загрузкой данных следует установить ClickHouse, создать БД radius и конечную таблицу radius.radacct. См. Установка ClickHouse
Узнать какой порядок столбцов в radius.radacct в MySQL:
DESCRIBE radius.radacct; |
Создать временную таблицу в ClickHouse, учитывая порядок столбцов в MySQL radacct (он должен совпадать):
CREATE TABLE radius.radacct_tmp
(
radacctid UInt64,
acctsessionid Nullable(String),
acctuniqueid Nullable(String),
username Nullable(String),
groupname Nullable(String),
domain Nullable(String),
realm Nullable(String),
nasipaddress FixedString(15),
nasportid Nullable(String),
nasporttype Nullable(String),
acctstarttime DateTime,
acctupdatetime Nullable(DateTime),
acctstoptime Nullable(DateTime),
acctinterval Nullable(Int32),
acctsessiontime Nullable(Int32),
acctauthentic Nullable(String),
connectinfo_start Nullable(String),
connectinfo_stop Nullable(String),
acctinputoctets Nullable(Int64),
acctoutputoctets Nullable(Int64),
calledstationid String,
callingstationid FixedString(17),
acctterminatecause String,
servicetype Nullable(String),
framedprotocol Nullable(String),
framedipaddress FixedString(15),
acctstartdelay Nullable(Int32),
acctstopdelay Nullable(Int32),
xascendsessionsvrkey Nullable(String),
inputpacketsdrop Nullable(Int64),
outputpacketsdrop Nullable(Int64),
inputbytesdrop Nullable(Int64),
outputbytesdrop Nullable(Int64),
outputpacketlost Nullable(Int64),
acctl2interface Nullable(String),
acctapdomain Nullable(String),
acctapid Nullable(String),
acctssid Nullable(String),
security UInt8 DEFAULT 1,
ssidtype UInt8 DEFAULT 0
) ENGINE = MergeTree
PARTITION BY toYYYYMM(acctstarttime)
ORDER BY (acctstarttime)
SETTINGS index_granularity = 8192 |
Версия без переносов строк и табуляций:
CREATE TABLE radius.radacct_tmp (radacctid UInt64, acctsessionid Nullable(String), acctuniqueid Nullable(String), username Nullable(String), groupname Nullable(String), domain Nullable(String), realm Nullable(String), nasipaddress FixedString(15), nasportid Nullable(String), nasporttype Nullable(String), acctstarttime DateTime, acctupdatetime Nullable(DateTime), acctstoptime Nullable(DateTime), acctinterval Nullable(Int32), acctsessiontime Nullable(Int32), acctauthentic Nullable(String), connectinfo_start Nullable(String), connectinfo_stop Nullable(String), acctinputoctets Nullable(Int64), acctoutputoctets Nullable(Int64), calledstationid String, callingstationid FixedString(17), acctterminatecause String, servicetype Nullable(String), framedprotocol Nullable(String), framedipaddress FixedString(15), acctstartdelay Nullable(Int32), acctstopdelay Nullable(Int32), xascendsessionsvrkey Nullable(String), inputpacketsdrop Nullable(Int64), outputpacketsdrop Nullable(Int64), inputbytesdrop Nullable(Int64), outputbytesdrop Nullable(Int64), outputpacketlost Nullable(Int64), acctl2interface Nullable(String), acctapdomain Nullable(String), acctapid Nullable(String), acctssid Nullable(String), security UInt8 DEFAULT 1, ssidtype UInt8 DEFAULT 0) ENGINE = MergeTree PARTITION BY toYYYYMM(acctstarttime) ORDER BY (acctstarttime) SETTINGS index_granularity = 8192 |
Загрузить данные во временную таблицу, запустив следующую команду в папке с архивами:
gzip -d -c `ls -cl . | awk '{print $9}'` | clickhouse-client -ujavauser --password=javapassword --input_format_allow_errors_num 10000 --input_format_allow_errors_ratio 0.1 --query="INSERT INTO radius.radacct_tmp FORMAT CSV"; |
При необходимости указать хост, на котором расположен ClickHouse, после clickhouse-client --host
Если выгрузка и загрузка данных делается порционно, то необходимо удалять старые архивы, чтобы не было дублирования данных в ClickHouse
Переместить данные в radius.radacct с помощью команды:
INSERT INTO radius.radacct SELECT assumeNotNull(acctsessionid) as acctsessionid, assumeNotNull(acctuniqueid) as acctuniqueid, assumeNotNull(username) as username, assumeNotNull(groupname) as groupname, assumeNotNull(domain) as domain, assumeNotNull(realm) as realm, assumeNotNull(nasipaddress) as nasipaddress, assumeNotNull(nasportid) as nasportid, assumeNotNull(nasporttype) as nasporttype, assumeNotNull(acctstarttime) as acctstarttime, acctupdatetime, acctstoptime, toUInt32(assumeNotNull(acctinterval)) as acctinterval, toUInt32(assumeNotNull(acctsessiontime)) as acctsessiontime, assumeNotNull(acctauthentic) as acctauthentic, assumeNotNull(connectinfo_start) as connectinfo_start, assumeNotNull(connectinfo_stop) as connectinfo_stop, toUInt64(assumeNotNull(if(acctinputoctets < 0, 0, acctinputoctets))) as acctinputoctets, toUInt64(assumeNotNull(if(acctoutputoctets < 0, 0, acctoutputoctets))) as acctoutputoctets, calledstationid, callingstationid, acctterminatecause, assumeNotNull(servicetype) as servicetype, assumeNotNull(framedprotocol) as framedprotocol, framedipaddress, toUInt32(assumeNotNull(acctstartdelay)) as acctstartdelay, toUInt32(assumeNotNull(acctstopdelay)) as acctstopdelay, assumeNotNull(xascendsessionsvrkey) as xascendsessionsvrkey, toUInt64(assumeNotNull(if(inputpacketsdrop < 0, 0, inputpacketsdrop))) as inputpacketsdrop, toUInt64(assumeNotNull(if(outputpacketsdrop < 0, 0, outputpacketsdrop))) as outputpacketsdrop, toUInt64(assumeNotNull(if(inputbytesdrop < 0, 0, inputbytesdrop))) as inputbytesdrop, toUInt64(assumeNotNull(if(outputbytesdrop < 0, 0, outputbytesdrop))) as outputbytesdrop, toUInt64(assumeNotNull(if(outputpacketlost < 0, 0, outputpacketlost))) as outputpacketlost, assumeNotNull(acctl2interface) as acctl2interface, assumeNotNull(acctapdomain) as acctapdomain, assumeNotNull(acctapid) as acctapid, assumeNotNull(acctssid) as acctssid, security, ssidtype FROM radius.radacct_tmp; |
или без переносов и табуляций:
INSERT INTO radius.radacct SELECT assumeNotNull(acctsessionid) as acctsessionid, assumeNotNull(acctuniqueid) as acctuniqueid, assumeNotNull(username) as username, assumeNotNull(groupname) as groupname, assumeNotNull(domain) as domain, assumeNotNull(realm) as realm, assumeNotNull(nasipaddress) as nasipaddress, assumeNotNull(nasportid) as nasportid, assumeNotNull(nasporttype) as nasporttype, assumeNotNull(acctstarttime) as acctstarttime, acctupdatetime, acctstoptime, toUInt32(assumeNotNull(acctinterval)) as acctinterval, toUInt32(assumeNotNull(acctsessiontime)) as acctsessiontime, assumeNotNull(acctauthentic) as acctauthentic, assumeNotNull(connectinfo_start) as connectinfo_start, assumeNotNull(connectinfo_stop) as connectinfo_stop, toUInt64(assumeNotNull(if(acctinputoctets < 0, 0, acctinputoctets))) as acctinputoctets, toUInt64(assumeNotNull(if(acctoutputoctets < 0, 0, acctoutputoctets))) as acctoutputoctets, calledstationid, callingstationid, acctterminatecause, assumeNotNull(servicetype) as servicetype, assumeNotNull(framedprotocol) as framedprotocol, framedipaddress, toUInt32(assumeNotNull(acctstartdelay)) as acctstartdelay, toUInt32(assumeNotNull(acctstopdelay)) as acctstopdelay, assumeNotNull(xascendsessionsvrkey) as xascendsessionsvrkey, toUInt64(assumeNotNull(if(inputpacketsdrop < 0, 0, inputpacketsdrop))) as inputpacketsdrop, toUInt64(assumeNotNull(if(outputpacketsdrop < 0, 0, outputpacketsdrop))) as outputpacketsdrop, toUInt64(assumeNotNull(if(inputbytesdrop < 0, 0, inputbytesdrop))) as inputbytesdrop, toUInt64(assumeNotNull(if(outputbytesdrop < 0, 0, outputbytesdrop))) as outputbytesdrop, toUInt64(assumeNotNull(if(outputpacketlost < 0, 0, outputpacketlost))) as outputpacketlost, assumeNotNull(acctl2interface) as acctl2interface, assumeNotNull(acctapdomain) as acctapdomain, assumeNotNull(acctapid) as acctapid, assumeNotNull(acctssid) as acctssid, security, ssidtype FROM radius.radacct_tmp; |
Перемещение данных в radius.radacct можно выполнять после загрузки всех данных во временную таблицу, либо после загрузки каждой порции данных.
Если перемещать данные после каждой порции, то нужно сохранять уникальность данных.
Это можно сделать двумя способами:
Очищать radacct_tmp после insert'а в radius.radacct
TRUNCATE TABLE radius.radacct_tmp |
Добавлять условие по времени вида:
... FROM radius.radacct_tmp WHERE acctstarttime between '2020-01-01 00:00:00' and '2020-02-01 00:00:00' |
После миграции всех данных можно удалить временную таблицу
DROP TABLE radius.radacct_tmp |