A hálózati események feldolgozása és tárolása rengeteg adatot generálhat, ami gyorsan több terabyte-os méretet ölthet naponta. Ezért fontos alaposan megfontolni, hogy mikor és hova adjunk hozzá kiegészítő mezőket, amelyek javíthatják az analitikai képességeinket, anélkül, hogy túlterhelnénk az adatbázist. A kiegészítő mezők hozzáadása hasznos lehet, de kérdés, hogy az analitikusok valóban használni fogják-e ezeket, és hogy képesek leszünk-e kezelni a megnövekedett eseményméreteket. A tárolási korlátok és az adatok hasznosságának közötti egyensúly megtalálása elengedhetetlen.

A hálózati események szűrésének és értelmezésének alapja, hogy pontosan tudjuk, hogy egy adott esemény melyik irányban áramlik a hálózaton. Az egyszerűbb megoldások, mint például az if-else elágazások alkalmazása, képesek eldönteni, hogy az esemény bejövő vagy kimenő, de a felhasználói élmény javítása érdekében célszerű lehet új mezőt hozzáadni, amely világosan jelzi az irányt.

Az irányok (be- és kimenő) egyértelmű nyilvántartása gyakran szükséges a hálózati események elemzéséhez. Egy egyszerű, de hatékony szűrőt hozhatunk létre, amely beépíti ezt az új mezőt, így a keresési lekérdezések, mint például a kimenő forgalom keresése, egyszerűbbé válnak. Ez egy intuitívabb keresési élményt biztosít az analitikusok számára, és könnyebbé teszi az API lekérdezések megtervezését.

A következő példában bemutatjuk, hogyan hozhatunk létre egy új mezőt, amely a hálózati irányt jelöli. Az új mező, például a network.direction, amelyet a tag-ek alapján számítunk ki, világosabbá teszi a lekérdezéseket, így ahelyett, hogy az analitikusoknak a tag-ek nevét kellene ismerniük, egyszerűen kereshetnek a network.direction:outbound mezőben.

A Logstash szűrők programozásakor a mutate függvény segítségével új mezőket adhatunk hozzá az eseményekhez. Az alábbi példa bemutatja, hogyan hozhatjuk létre a network.direction mezőt a hálózati események irányának figyelembevételével:

plaintext
filter { if [source][ip] and [destination][ip] { if "source_homenet" in [tags] and "destination_homenet" in [tags] { mutate { add_field => { "[network][direction]" => "internal" } } } else if "source_homenet" in [tags] and "destination_homenet" not in [tags] { mutate { add_field => { "[network][direction]" => "outbound" } } } else if "source_homenet" not in [tags] and "destination_homenet" in [tags] { mutate { add_field => { "[network][direction]" => "inbound" } } } else { mutate { add_field => { "[network][direction]" => "unknown" } } } } }

A fenti kódban először ellenőrizzük, hogy a forrás- és cél IP címek léteznek-e. Ha mindkét cím jelen van, az eseményt a network.direction mezővel látjuk el, amelyet ennek megfelelően belső (internal), kimenő (outbound) vagy bejövő (inbound) irányba kategorizálunk. Ha egyik cím sem található meg, akkor az irányt "unknown"-ra állítjuk, ami azt jelzi, hogy valami nem stimmelhet a hálózati konfigurációval.

Ezután a következő lépés a szűrők futtatása, hogy megbizonyosodjunk arról, hogy a mezők helyesen kerültek hozzáadásra az eseményekhez. A stdout plugint használjuk a kimenet megjelenítésére, amely segít az adatok vizualizálásában és a hibák gyors felismerésében.

A következő kódrészlet bemutatja, hogyan lehet ezt elérni:

plaintext
output {
stdout { codec => rubydebug } }

Amikor a kódot végrehajtjuk, ellenőrizzük, hogy a szűrők megfelelően vannak-e egymásba ágyazva, a karakterláncok zárójelei rendben vannak-e, és hogy az egyes lehetőségek a megfelelő értékekkel vannak-e elválasztva.

Miután az események feldolgozása megtörtént, a Logstash futtatásával könnyedén megjeleníthetjük a kívánt kimenetet, amely a következő formában jelenhet meg:

plaintext
{ "@timestamp" => 2040-01-12T04:22:17.184746833Z, "host" => { "name" => "logstash.local" }, "destination" => { "geo" => { "city_name" => "St. Louis", "country_name" => "United States" }, "ip" => "192.168.8.138", "user" => { "full_name" => "Bob Robertson" } }, "source" => { "geo" => { "city_name" => "St. Louis", "country_name" => "United States" }, "ip" => "192.168.8.133", "user" => { "full_name" => "Alice Allison" } }, "network" => { "direction" => "internal" }, "tags" => [ "source_homenet", "destination_homenet" ] }

A Logstash ezen szűrők révén nemcsak a hálózati események irányát és típusát tudjuk megkülönböztetni, hanem lehetőséget ad arra is, hogy az adatokat bővítsük és összekapcsoljuk más rendszerekkel, például az Active Directory-val vagy az Ansible-lel, hogy automatizálhassuk az események blokkolását vagy riasztások generálását.

Az események közötti mezők összehasonlítása egy másik fontos aspektusa a Logstash szűrők használatának. Az egyenlőség (==) és az egyenlőtlenség (!=) összehasonlítások alapvető eszközei a szűrési logikának. Ezen kívül ellenőrizhetjük egy mező létezését, vagy azt is, hogy egy érték szerepel-e egy tömbben, így pontosabb és rugalmasabb feldolgozást érhetünk el. Az egyszerű mezőösszehasonlítások révén képesek vagyunk meghatározni, hogy egy esemény megfelel-e bizonyos kritériumoknak, és ha igen, akkor mit kell vele tenni.

Hogyan optimalizáljuk a Kafka és Logstash integrációt biztonságos adatfeldolgozási környezetekben?

A Kafka és a Logstash kombinációja lehetővé teszi az adatok rugalmas és skálázható kezelését, különösen akkor, ha biztonságos adatfeldolgozási környezetekben dolgozunk. Az alábbiakban részletesen bemutatjuk, hogyan konfigurálhatjuk ezt az integrációt különböző munkafolyamatokhoz, valamint hogyan érhetjük el a kívánt adatfeldolgozási eredményeket a biztonsági előírások figyelembevételével.

A konfiguráció első lépése a Logstash alkalmazás hozzáadása a Logstash fogyasztói csoporthoz. Az SSL alapú kommunikációval a Kafka és a Logstash közötti kapcsolat biztonságos, ahol a Kafka hosztnevek ellenőrzése TLS tanúsítványok segítségével történik. A Logstash konfigurációs fájlban a kulcsfontosságú beállítások az SSL kapcsolatok konfigurálásához szükséges információkat tartalmazzák, például a keystore és truststore fájlok helyét és jelszavait. Az SSL kapcsolat biztosítja, hogy minden Kafka üzenet biztonságosan legyen továbbítva.

A következő lépés a Kafka témák (topics) listázása, amely a Logstash számára a különböző adatforrásokat határozza meg. A decorate_events opció lehetővé teszi, hogy a Kafka metainformációkat hozzáadjuk az eseményekhez, amelyek később feldolgozásra kerülnek. Ezen metainformációk között található az @metadata.kafka.topic mező, amely alapján a Logstash szűrőjével eldönthetjük, hogy az adott esemény megfelel-e a Beats program által előállított adatoknak. Ha az események tartalma megfelel az elvárásoknak, a JSON feldolgozó fut, és kibővíti az eredeti Beats mezőket az eseményeken belül.

A továbbiakban, amennyiben az event.original mező megegyezik az message mezővel, akkor eltávolítjuk az event.original mezőt, így csökkentve az adatméretet és a tárolási igényt. Ez az eljárás segít abban, hogy a Kafka rendszer hatékonyan kezelje az adatokat, miközben biztosítja, hogy a feldolgozott adatok megfeleljenek a kívánt szabványoknak.

A Kafka adatfeldolgozási környezetében többféle Logstash elhelyezési lehetőség is szóba jöhet. Az egyik lehetőség, hogy a Logstash-ot a Kafka előtt helyezzük el, ami lehetővé teszi az adatok előfeldolgozását, tisztítását és gazdagítását, mielőtt azok belépnének a Kafka rendszerbe. Ez az elrendezés biztosítja, hogy a Kafka-ba bekerülő üzenetek JSON formátumban legyenek, miközben a Logstash kizárólag a Logstash hosztokkal való kapcsolatokat kezeli. Az ilyen típusú elhelyezés hátránya, hogy nagy terhelést jelenthet a Logstash számára, mivel minden adat áramlásánál részt kell vennie. Ezért gyakran szükséges egy terheléselosztó alkalmazása a Logstash klaszter előtt.

Másik lehetőségként a Logstash Kafka utáni elhelyezése is alkalmazható, amely lehetővé teszi az üzenetek későbbi gazdagítását, így a biztonsági csapat csak az adatgazdagításra összpontosíthat, nem pedig az adatok előzetes feldolgozására. Ez az elhelyezés hasznos lehet a biztonság orientált környezetekben, ahol a nyers üzenetek megtartása is fontos lehet a megfelelőségi előírások betartásához.

A Logstash köztes elhelyezése lehetővé teszi a biztonsági adatok összesítését és gazdagítását a Kafka témák között, így az adatfolyamokat a későbbi feldolgozáshoz lehet továbbítani. A köztes elhelyezés lehetővé teszi a különböző üzenettípusok, például a nyers és a feldolgozott üzenetek szétválasztását, amely előnyös lehet a megfelelőségi előírásoknak való megfelelés érdekében. Azonban ez a megoldás további Kafka témák létrehozását igényli, és a Kafka adatmegőrzési szabályai szerint az előfeldolgozott üzenetek időtartama korlátozható.

Az ideális megoldás kiválasztása előtt érdemes figyelembe venni minden elhelyezési típus előnyeit és hátrányait, és gondosan mérlegelni, hogy melyik elrendezés felel meg a legjobban a rendszer igényeinek. Egy jó döntés meghozatala mindig könnyebb, mint később, működés közben módosítani a már implementált rendszert.

A Kafka és Logstash kombinációja nemcsak a biztonsági adatfeldolgozás szempontjából lehet előnyös, hanem a rendszer teljesítményének növelésében is kulcsszerepet játszhat, ha megfelelően van konfigurálva és elhelyezve a rendszerben. Az Apache Kafka és a Logstash rugalmas és skálázható megoldást kínálnak az adatok kezelésére, miközben biztosítják a szükséges biztonsági intézkedéseket, és lehetővé teszik a megfelelőségi előírások betartását.