Python w połączeniu z Pandas i Beautiful Soup to potężne i elastyczne narzędzia. Pozwalają wydobyć dane i przekształcać je do użytecznej formy. Cała gra nabiera rumieńców gdy zaczynamy orientować się o jakie stawki się toczy.

Myśl przewodnia pierwszej konferencji Data Driven Innovation organizowanej przez Computerworld:

Świat, który znamy dzisiaj nie może istnieć bez paliwa jakim jest nowa ropa, czyli dane. Dzięki rozwojowi technologii wytwarzamy i magazynujemy ich ogromne ilości. Dobrze wykorzystane mogą stać się naszym najlepszym sprzymierzeńcem w szybko zmieniającym się świecie.

Faktem jest też, że każda dowolnie wybrana organizacja ma 80 % lub więcej swoich danych w postaci nieustrukturyzowanej tzw. „dark data” co sprawia trudności w ich analizowaniu, wyciąganiu wniosków i podejmowaniu decyzji (o tym w tym wpisie).

Mając to wszystko na uwadze, kluczowym aspektem staje się umiejętne wydobycie danych z posiadanych już źródeł.

Dzisiejszy wpis to próba praktycznego ogarnięcia małego  fragmentu firmowej rzeczywistości. Przykład będzie mały, ale konkretny i dość uniwersalny by można było go zastosować w innym obszarze. 

Schemat działania jest dosyć prosty i sprowadza się do kilku kroków rozbitych na konkretne zadania:

  • znaleźć źródło,
  • dobrać odpowiednie narzędzia,
  • oczyścić dane,
  • przetransportować w odpowiednie miejsce,
  • zautomatyzować proces.

W całym modelu to źródło i dane determinują jakich narzędzi będziemy używać.

Definiowanie Źródła

Źródło danych jest stosunkowo łatwo określić. Jest to miejsce, gdzie gromadzą się dane pod różną postacią. Zadanie jest takie, aby określić to miejsce i przekierować dane w miejsce, gdzie będą miały one bardziej przydatną formę, po to by mogły być zasobem służącym do dalszych analiz. W dowolnej organizacji możemy zdefiniować mnóstwo takich miejsc:

  • skrzynki pocztowe,
  • przeglądarki,
  • foldery lokalne,
  • dyski sieciowe,
  • pamięci wewnętrzne różnych urządzeń,
  • IoT (Internet Rzeczy) i wiele, wiele innych.

Same dane mogą mieć różną postać: TXT, Excele, PDF-y, XML-e, JSON-y, CSV, HTML-e, wiadomości pocztowe, obrazy, pliki dźwiękowe.

W projekcie wykorzystamy pirometr (bezdotykowy termometr), który mierzy co minutę 5 różnych parametrów i zapisuje je w pamięci wewnętrznej w postaci pliku CSV. Co jakiś czas tworzony jest nowy plik aż do wyczerpania pamięci. Urządzenie jest podpięte do sieci (ma swój adres IP). Aktualne parametry i historyczne pomiary można pobrać przez stronę urządzenia wpisując jego adres IP w dowolną przeglądarkę.

Strona Urządzenia w Sieci.

Po przejściu na kartę pamięci (CompactFlash Card), ukazuje nam się prawdziwe źródło danych.

Lista Plików z Archiwalnymi Pomiarami

Przy każdym pliku jest hiperłącze pozwalające pobrać plik CSV. Przykładowy plik z danymi po otwarciu prezentuje się tak jak poniżej.

Plik CSV Od Środka

Jak widać, dane są w całkiem niezłej formie, czekają tylko na interwencje. Warto zatem dobrać odpowiednie narzędzia.

Skrzynka z Narzędziami

Tytuł tego wpisu jest spoilerem i sugeruje jakie narzędzia będą w użyciu. Nie jest to oczywiście zamknięty zbiór. Sam projekt da się zrealizować na kilka różnych sposobów. Często sam w trakcie pracy zmieniam narzędzia, by lepiej dostosować się do kontekstu. Myślę, że to jest w porządku, o ile realizujemy wyznaczony cel.

Poniżej lista bibliotek i narzędzi wykorzystanych w projekcie, z krótkim opisem do czego, nam posłużą:

  • Python – język programowania, z którym wykonamy cały projekt,
  • Requests – to prosta i elegancka biblioteka HTTP dla Pythona, pozwoli pobrać stronę HTML urządzenia,
  • Urllib – biblioteka do pracy z adresami URL, zamieni nazwy plików i parametry do połączenia z bazą danych na adresy URL,
  • Beaufiful Soup – biblioteka, która z pobranej strony pozwoli wyciągnąć nazwy plików archiwum, by stworzyć do nich adresy URL,
  • Pandas – mając adresy URL plików CSV, wykorzystamy je, by wczytać dane do obiektów DataFrame (w celu dalszych operacji),
  • Chardet – biblioteka, która pomoże nam rozwiązać pewien problem związany z wczytaniem pliku CSV (o tym później),
  • SQLAlchemy – biblioteka, która wyśle dane do bazy danych,
  • Program Bazodanowy – tutaj, Microsoft Server 2012 (wybór programu miał u mnie wpływ na automatyzację przesyłania danych – wykorzystanie w tym celu SQL Server Agend i tzw. Job-ów, o czym dalej).

Część z powyższych bibliotek była pewna od początku, część pojawiła się w trakcie prac. Zaprezentowany zestaw pozwolił na zrealizowanie całego zadania: przeniesienia danych archiwalnych jednorazowo oraz automatyzację w przypadku nowych danych.

Mamy narzędzia, zacznijmy od pożywnej pięknej zupy.

Piękna Zupa

Samej biblioteki Beautiful Soup nie będę opisywał. Wspomnę tylko, że jest to szeroko stosowane narzędzie do tzw. Web Scrappingu, czyli metody wyciągania danych ze stron internetowych.

Poniżej widok strony WWW urządzenia z listą plików archiwalnych.

To samo w HTML-u wygląda tak:

<head><title>ScreenRecorder - /StorageCard/KD7/</title>
</head>
<body>
<h1>ScreenRecorder - /StorageCard/KD7/</h1>
<hr/><a href="/StorageCard/">[To Parent Directory]</a><br/><br/>
<pre>6/28/2016    12:00 AM             0 <a href="20160627 062900 1 G1 - Process Group 1.csv">20160627 062900 1 G1 - Process Group 1.csv</a><br/>   
8/23/2016    12:00 AM       5934624 <a href="20160628 095900 1 G1 - Process Group 1.csv">20160628 095900 1 G1 - Process Group 1.csv</a><br/>
8/23/2016    12:00 AM          6400 <a href="20160627 114012 1 - AuditLog.csv">20160627 114012 1 - AuditLog.csv</a><br/>
8/29/2016    12:00 AM       2206818 <a href="20160812 220800 1 G1 - Process Group 1.csv">20160812 220800 1 G1 - Process Group 1.csv</a><br/>
8/29/2016    12:00 AM          4186 <a href="20160829 105700 1 G1 - Process Group 1.csv">20160829 105700 1 G1 - Process Group 1.csv</a><br/>
8/29/2016    12:00 AM        267124 <a href="20160829 114000 1 G1 - Process Group 1.csv">20160829 114000 1 G1 - Process Group 1.csv</a><br/>   
8/31/2016    12:00 AM        285432 <a href="20160831 094800 1 G1 - Process Group 1.csv">20160831 094800 1 G1 - Process Group 1.csv</a><br/>    
9/2/2016    12:00 AM         22728 <a href="20160902 103400 1 G1 - Process Group 1.csv">20160902 103400 1 G1 - Process Group 1.csv</a><br/>
9/2/2016    12:00 AM          1974 <a href="20160902 142000 1 G1 - Process Group 1.csv">20160902 142000 1 G1 - Process Group 1.csv</a><br/>    
9/2/2016    12:00 AM        771622 <a href="20160902 144000 1 G1 - Process Group 1.csv">20160902 144000 1 G1 - Process Group 1.csv</a><br/>
9/20/2016    12:00 AM       1924910 <a href="20160915 134110 0 G1 - Process Group 1.csv">20160915 134110 0 G1 - Process Group 1.csv</a><br/>   
</pre><hr/>
</body>

Zadanie jest takie. Wyłowić z HTML-a nazwy plików, tak aby zrobić do nich adresy URL. Tworzymy listę plików na bazie wartości atrybutu href obiektu a.

<a href=”20160627 062900 1 G1 – Process Group 1.csv„>20160627 062900 1 G1 – Process Group 1.csv</a>

Technicznie cała operacja mieści się w kilku linijkach kodu.

# IMPORT POTRZEBNYCH BIBLIOTEK
from bs4 import BeautifulSoup
import requests
import urllib

# adres sieciowy urządzenia i danych na karcie pamieci
html_site = 'http://192.168.0.201/StorageCard/KD7/'

# pobranie strony HTML
r = requests.get(html_site)
soup = BeautifulSoup(r.text, 'html.parser')

# stworzenie listy plików z atrubutów 'href' obiektu 'a' pobranej strony
lista_plikow = [link.get('href') for link in soup.find_all('a') if 'G1' in str(link)]
# zamiana nazw plików na adresy URL
lista_plikow_ref = [urllib.parse.urljoin(html_site , urllib.parse.quote(item)) for item in lista_plikow]

Po tej operacji lista plików do pobrania jest gotowa. Pora wykorzystać drugiego tytułowego bohatera, czyli Pandas. Teoretycznie możemy już wczytywać dane do obiektu DataFrame, ale jest pewien problem.

Kodowanie i Chardet

Próba wczytania pliku do obiektu DataFrame:

df = pd.read_csv(lista_plikow_ref[0]')

skutkuje błędem:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte

Domyślne kodowanie dla obiektów DataFrame to utf-8, widocznie w plikach CSV jest inne skoro mamy błąd. Z pomocą przychodzi narzędzie Chardet (opisane w tym tutorialu przy okazji usuwania problemów z kodowaniem).

Pobranie jednego pliku i sprawdzenie go za pomocą Chardet dało odpowiedź, jakiego kodowania użyto w plikach.

# IMPORT POTRZEBNYCH BIBLIOTEK
import chardet

# sprawdzamy pierwsze 10 000 bajtów by odgadnąć kodowanie znaków
with open("20160628 095900 1 G1 - Process Group 1.csv", 'rb') as rawdata:
    result = chardet.detect(rawdata.read(10000))

# sprawdzamy jakie może być kodowanie
print(result)
{'encoding': 'UTF-16', 'confidence': 1.0, 'language': ''}

Ostatecznie wczytanie plików powinno wyglądać tak (separatorem między kolumnami jest tabulator).

# IMPORT POTRZEBNYCH BIBLIOTEK
import pandas as pd

# wczytanie danych z kodowaniem 'utf-16' i separatorem '\t'
df = pd.read_csv(lista_plikow_ref[0],sep='\t',encoding='utf-16')

Pandas i Czyszczenie Danych

Eksploracja poszczególnych plików ujawniła kilka problemów, z którymi trzeba się zmierzyć:

  • zmieniają się też typy danych w poszczególnych kolumnach, raz jest to liczba rzeczywista, raz łańcuch znaków, raz liczba całkowita,
  • nazwy kolumn dla poszczególnych parametrów zmieniają się,
  • pojawiają się błędy pomiaru, co jakiś czas występują wartości znacząco wykraczające poza dopuszczalne granice.

Wszystkie kolumny związane z parametrami zamienimy na typ float.

# Zamiana kolumn na typ float
def series_to_float(df):
    df = df.astype(str)
    df = df.str.replace(',', '.')
    return df.astype(float)

Ujednolicimy nazwy kolumn. Te same nazwy posłużą do stworzenia odpowiedniej tabeli w bazie danych. Kolumnę, która zawiera czas wykonania pomiaru, zamieniamy na typ datatime.

# Zamiana nazw kolumn i zmiana typów na float
def rename_and_retype(df):
    dataTypeDict = dict(df.dtypes)
    licznik = 0
    for key, value in dataTypeDict.items():
        if licznik == 0:
            df.rename(columns={key: 'godzina_pomiaru'}, inplace=True)
            df['godzina_pomiaru'] = pd.to_datetime(df['godzina_pomiaru'])
        elif licznik == 1:
            df.rename(columns={key: 'temperatura_rurociag'}, inplace=True)
            df['temperatura_rurociag'] = series_to_float(df['temperatura_rurociag'])
        elif licznik == 2:
            df.rename(columns={key: 'temperatura_filtr'}, inplace=True)
            df['temperatura_filtr'] = series_to_float(df['temperatura_filtr'])
        elif licznik == 3:
            df.rename(columns={key: 'podcisnienie_rurociag'}, inplace=True)
            df['podcisnienie_rurociag'] = series_to_float(df['podcisnienie_rurociag'])
        elif licznik == 4:
            df.rename(columns={key: 'temperatura_struga'}, inplace=True)
            df['temperatura_struga'] = series_to_float(df['temperatura_struga'])
        elif licznik == 5:
            df.rename(columns={key: 'temperatura_induktor'}, inplace=True)
            df['temperatura_induktor'] = series_to_float(df['temperatura_induktor'])
        elif licznik == 6:
            df.drop(key, axis=1, inplace=True)
        licznik += 1
    return df

Ostatecznie spakujemy wszystko w jedną funkcję, gdzie przy okazji pozbędziemy się błędów pomiaru.

# ostateczne czyszczenie z usunięciem błędnych pomiarów
def clean(df):
    df.reset_index(drop=True, inplace=True)
    df = rename_and_retype(df)

    df.loc[df[df.temperatura_struga > 2000.0].temperatura_struga.index, ['temperatura_struga']] = 0
    df.loc[df[df.temperatura_struga < -2000.0].temperatura_struga.index, ['temperatura_struga']] = 0
    df.loc[df[df.podcisnienie_rurociag > 2000.0].podcisnienie_rurociag.index, ['podcisnienie_rurociag']] = 0
    df.loc[df[df.podcisnienie_rurociag < -1000].podcisnienie_rurociag.index, ['podcisnienie_rurociag']] = 0
    df.loc[df[df.temperatura_induktor > 2000.0].temperatura_induktor.index, ['temperatura_induktor']] = 0
    df.loc[df[df.temperatura_induktor < -2000.0].temperatura_induktor.index, ['temperatura_induktor']] = 0

Po tych operacjach dane są gotowe by trafić do punktu przeznaczenia.

Miejsce Docelowe

Dla naszych oczyszczonych danych musimy przygotować odpowiednie miejsce. Mam tu na myśli dedykowaną tabelę w bazie danych.

Instalacja serwera bazodanowego, tworzenia nowej bazy danych to osobny i rozległy temat. Zależy od oprogramowania, które do tego celu wybierzemy. Zakładam, że zarówno że serwer bazodanowy, jak i gotową bazę danych już mamy. Przykładowy tutorial jak to zrobić tutaj.

Poniżej gotowa struktura tabeli i kod SQL uruchamiany na serwerze bazodanowym, który tworzący tabelę docelową.

/****** Object:  Table [dbo].[pomiary_pirometr]    Script Date: 2020-10-27 08:48:22 ******/
SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE TABLE [dbo].[pomiary_pirometr](
	[id_pomiaru] [int] IDENTITY(1,1) NOT NULL,
	[godzina_pomiaru] [smalldatetime] NULL,
	[temperatura_rurociag] [decimal](4, 1) NULL,
	[temperatura_filtr] [decimal](4, 1) NULL,
	[podcisnienie_rurociag] [decimal](4, 1) NULL,
	[temperatura_struga] [decimal](5, 1) NULL,
	[temperatura_induktor] [decimal](4, 1) NULL,
	[czy_aktualny] [bit] NULL,
	[timestamp] [datetime] NULL,
PRIMARY KEY CLUSTERED 
(
	[id_pomiaru] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

GO

ALTER TABLE [dbo].[pomiary_pirometr] ADD  DEFAULT ((1)) FOR [czy_aktualny]
GO

ALTER TABLE [dbo].[pomiary_pirometr] ADD  DEFAULT (getdate()) FOR [timestamp]
GO

Mamy już oczyszczone dane i miejsce docelowe. Pora wypełnić tabelę.

Transport Danych

Za transport danych odpowiada biblioteka SQLAlchemy. Duży plus tej biblioteki to duża elastyczność i współpraca z takimi systemami jak: SQLite, Postgresql, MySQL, Oracle, MS-SQL, Firebird, Sybase.

Przy gotowych i oczyszczonych danych cały transport danych archiwalnych do naszej tabeli zawiera się w poniższym kodzie.

# IMPORT POTRZEBNYCH BIBLIOTEK
from sqlalchemy import create_engine

# Dane do połączenia z bazą danych
server = 'XXX.XXX.XXX.XXX' # adres IP naszego serwera
database = 'nazwa_bazy_danych'     # baza danych na serwerze
username = 'nazwa_uzytkownika'     # login użytkownika
password = 'haslo_uzytkownika'     # hasło
conn_string= 'DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password

# zamiana łańcucha połączenia na URL
params = urllib.parse.quote_plus(conn_string)
# tworzymy połączenie z bazą
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params,fast_executemany=True)

# KROK 1: TRANSPORT DANYCH ARCHIWALNYCH
# oczyszczenie i zapis każdego pliku w bazie danych
for item in lista_plikow_ref[1:]:
    # wczytujemy po kolei każdy plik do obiektu DataFrame
    df = pd.read_csv(item, sep='\t', encoding='utf-16')
    # czyścimy
    clean(df)
    # transportujemy do bazy
    df.to_sql('pomiary_pirometr', con=engine, if_exists='append', index=False)
    # komunikat potwierdzający dodanie
    print('Dodano: ',item)

Gdy mamy już nasze dane możemy zadbać o ostatni krok, aby kolejne pomiary, które pojawiają się co minutę, same już trafiały do naszej tabeli.

Automatyzacja

Automatyzację można przeprowadzić na wiele sposobów. Jak ją przeprowadzimy, zależy od wielu czynników. Od systemu operacyjnego, na jakim nasz program będzie uruchamiany, systemu bazodanowego, z jakiego korzystamy itd.

Całość jednak sprowadza się do uruchomienia co jakiś czas kodu (w zależności jak świeże dane będziemy chcieli mieć w tabeli), który zrobi coś takiego:

  1. zajrzy do naszej tabeli i sprawdzi, jaka jest data i godzina ostatniego najbliżej obecnej chwili pomiaru (najnowszego),
  2. pobierze adresy URL dwóch ostatnich plików z archiwum (Dlaczego dwóch? Ponieważ w zależności od ustawionego czasu wykonania kodu część zapisów może być w przed ostatnim pliku archiwum a nowe wpisy w ostatnim pliku),
  3. otworzy dwa ostatnie pliki,
  4. oczyści dane metodami, które przygotowaliśmy,
  5. wybierze tylko te dane, których dane pomiaru są większe od ostatniego pomiaru (krok 1),
  6. przerzuci dane do bazy, dodając je do tych już istniejących.

Kod do tego rozwiązania:

# # KROK 2: AUTOMATYZACJA WCZYTYWANIA NOWYCH DANYCH
# oczyszczenie i zapis każdego pliku w bazie danych
for item in lista_plikow_ref[-2:]:
    df = pd.read_csv(item, sep='\t', encoding='utf-16')
    clean(df)
    with engine.connect() as connection:
        result = connection.execute('select max(godzina_pomiaru) from pomiary_pirometr')
        for row in result:
            df = df[df.godzina_pomiaru > row[0]]
            df.to_sql('pomiary_pirometr', con=engine, if_exists='append', index=False)

Pozostało tylko sprawić, żeby ten kod uruchamiał się co jakiś czas. Jak wspomniałem, można zrobić to na różne sposoby: harmonogram zadań albo utworzenie usługi lub demona działającego w tle. Ja wykorzystam tzw. Job-a, jedna z funkcjonalności SQL Server Agent w systemie Windows Server 2012.

SQL Server Agent pilnuje aby co godzinę uruchomił się plik wsadowy o nazwie start_pirometr.bat.

Dlaczego plik wsadowy?

Bo dla programu stworzone za pomocą Anacondy nowe wirtualne środowisko o nazwie pirometr gdzie znajdują się pobrane i zainstalowane wszystkie biblioteki wykorzystywane w tym projekcie. Plik wsadowy ma za zadanie aktywować te wirtualne środowisko, a następnie wykonać nasz kod.

call C:\ProgramData\Anaconda3\Scripts\activate.bat pirometr
call "C:\ProgramData\Anaconda3\python.exe" "D:\python_skrypty\pirometr\pirometr.py"

Instrukcja jak za pomocą Job-a uruchamiać pliki wsadowe tutaj.

Że skrypt jest wykonywany, działa i dane spływają nieustannie, możemy obejrzeć w historii JOB-a.

Historia Sukcesu JOB-ów

Co Dalej?

Wracają na chwilę do konferencji Data Driven Innovation Artur Duszczyk Business Development Director, SUSE Polska w swojej prezentacji wspomniał, że z danymi trzeba coś zrobić. Samo ich zbieranie i posiadanie nic nie daje. To dopiero wstęp do potencjalnych innowacji.

Pytanie zatem co można zrobić z tym, co już mamy?

  • udostępnić innym, aby mogli z tego korzystać np. za pomocą widoków podłączonych w Excelu,
  • pomyśleć o innych urządzeniach, które również można w ten sposób zautomatyzować i łączyć dane za pomocą daty i godziny,
  • spróbować połączyć z systemem produkcyjnym rejestrującym przebieg produkcji, by badać wpływ naszych parametrów na cały proces produkcyjny, jakoś produktów itd.

To tylko część możliwości, jakie przed nami stoją przy danych w takiej postaci.

Podsumowanie

Python, Pandas, Beautiful Soup z paroma innymi bibliotekami w niewielkim programie wydobyły dane, nadały odpowiedni kształt i strukturę, a co najważniejsze sprawiły, że mają swoją wartość użytkową.

Możliwości wykorzystania danych zaczynają być nieporównywalnie większe w stosunku do ich pierwotnej postaci. Obecnie mogą być wykorzystywane do optymalizacji procesu produkcyjnego, podnoszenia jakości finalnego produktu czy redukowania kosztów firmy.

Gotowy skrypt do pobrania tutaj.

A jakie są Twoje potencjalne dane do wydobycia?