카테고리 없음

Mitsubishi PLC Q Series Control code, using by Python3(MC protocol)

HawordFREAKEK 2025. 2. 24. 20:31
import socket
import re
import datetime

def send_packet(ip, port, hex_dump, network_num, station_num, cpu_num):
    """
    주어진 IP와 포트에 hex 데이터를 전송하고 응답을 처리합니다.

    Args:
        ip (str): 대상 서버의 IP 주소.
        port (int): 대상 서버의 포트 번호.
        hex_dump (str): 전송할 hex 데이터 문자열.
        network_num (str): 네트워크 번호 (hex).
        station_num (str): 스테이션 번호 (hex).
        cpu_num (str): CPU 번호 (hex).

    Returns:
        int: 성공 시 1, 실패 시 0.
    """
    packet = bytes.fromhex(hex_dump[108:])

    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.connect((ip, port))
            sock.sendall(packet)
            print("Packet sent successfully!")

            sock.settimeout(2.0)
            data = sock.recv(1024)
            print("-----------------------")
            print(data)
            print("-----------------------")

            dump_val = data.hex()
            try:
                re_val = re.search('010101000000(.+?)2020202020', dump_val).group(1)
                result_string = ''.join(chr(int(re_val[i:i+2], 16)) for i in range(0, len(re_val), 2))
            except Exception as e:
                print(f"Error parsing response: {e}")
                result_string = "Try Again"

            print(f"[+] Received Packet: {data.hex()}")

            with open(f'./Excel_Sc/{ip}', 'a') as f:
                f.write(f"1. Ethernet network : {int(network_num, 16)}\n")
                f.write(f"2. Station Number : {int(station_num, 16)}\n")
                f.write(f"3. Ethernet station number : {int(cpu_num, 16)}\n")
                f.write(f"4. CPU Type : {result_string}\n")

            print("+++++++++++++++++++++++++++++++")
            return 1

    except Exception as e:
        print(f"Error occurred: {e}")
        return 0

def main():
    """
    IP 목록을 읽어 각 IP에 대해 패킷을 전송하고 결과를 기록합니다.
    """
    try:
        with open("./iplist.txt", "r") as f:
            iplist = [line.strip() for line in f]
    except FileNotFoundError:
        print("iplist.txt 파일을 찾을 수 없습니다.")
        return

    port_number = 5002

    for ip_address in iplist:
        print(f"Processing IP: {ip_address}")
        with open('./ip_log', 'a') as f:
            f.write(f"{ip_address} : {datetime.datetime.now()}\n")

        for network_num_int in range(1, 35):
            for station_num_int in range(20, 70):
                for cpu_num_int in range(1, 20):
                    network_num = f"{network_num_int:02x}"
                    station_num = f"{station_num_int:02x}"
                    cpu_num = f"{cpu_num_int:02x}"

                    print("---------------------")
                    print(f"Ethernet Network: {network_num_int}")
                    print(f"Ethernet Serial Number: {station_num_int}")
                    print(f"CPU Serial Number: {cpu_num_int}")

                    hex_dump = f'00d8617fb7985ca6e68d56fb080045000051{station_num}a640008006f67b0a5e59b40a582a1b2f56138a6079996f43dd1feb5018faf0bd470000510100000011110700{network_num}{cpu_num}ff03{network_num}{station_num}fe03000014001c080a0800000000000000040101010000000001'
                    print("---------------------")

                    if send_packet(ip_address, port_number, hex_dump, network_num, station_num, cpu_num) == 1:
                        break
                if send_packet(ip_address, port_number, hex_dump, network_num, station_num, cpu_num) == 1:
                    break
            if send_packet(ip_address, port_number, hex_dump, network_num, station_num, cpu_num) == 1:
                break
        print(datetime.datetime.now())

if __name__ == "__main__":
    main()

ㅅㄷㄴㅅ

import socket
import datetime
import re

def send_packet(ip, port, hex_dump):
    """
    주어진 IP와 포트에 hex 데이터를 전송하고 응답을 처리합니다.
    """
    global module_names, project_titles

    packet = bytes.fromhex(hex_dump[108:])

    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.connect((ip, port))
            sock.sendall(packet)
            print("Packet sent successfully!")

            sock.settimeout(2.0)
            data = sock.recv(1024)
            print(f"[+] Received Packet: {data.hex()}")

            dump_val = data.hex().replace("00", "")
            print(f"[2+] Received Packet (cleaned): {dump_val}")

            remove_val = re.findall('d1(.+?)20202020202020', dump_val)
            if remove_val:
                dump_val = dump_val.replace(remove_val[0], "")
            print(f"[3+] Received Packet (removed): {dump_val}")

            module_val = re.findall('2020(.+?)515047', dump_val)
            module_val2 = re.findall('515047(.+?)2020', dump_val)

            print("Module and Title extraction:")

            del_set = ['20', '00']
            module_names_hex = [i for i in module_val if i not in del_set]
            project_titles_hex = [i for i in module_val2 if i not in del_set]

            print(f"Module names (hex): {module_names_hex}")
            print(f"Project titles (hex): {project_titles_hex}")

            module_names_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in module_names_hex]
            project_titles_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in project_titles_hex]

            module_names.extend(module_names_str)
            project_titles.extend(project_titles_str)

    except Exception as e:
        print(f"Error occurred: {e}")

def main():
    """
    IP 주소 목록을 입력받아 각 IP 주소에 대해 패킷을 전송하고 결과를 처리합니다.
    """
    global module_names, project_titles

    ip_list = ["10.88.42.124"] #테스트용 ip리스트
    module_names = []
    project_titles = []

    for ip_address1 in ip_list:
        ip_address = input("Type IP address: ")
        port_number = 5002

        ethernet_N_N_1 = int(input("1. Ethernet network: "))
        ethernet_N_N = hex(ethernet_N_N_1).replace("0x", "").zfill(2)

        ethernet_S_N_1 = int(input("2. Ethernet station number: "))
        ethernet_S_N = hex(ethernet_S_N_1).replace("0x", "").zfill(2)

        ethernet_S_N_2 = int(input("3. Ethernet Module number: "))
        CPU_S_N2 = hex(ethernet_S_N_2).replace("0x", "").zfill(2)

        read_val = f'00d8617fb7985ca6e68d56fb08004500005b{ethernet_S_N}2140008006f9950a5e59b40a582a7c6fb4138a192a0755de60748d5018fbe354620000510124000011110700{ethernet_N_N}{CPU_S_N2}ff03{ethernet_N_N}{ethernet_S_N}fe0300001e001c080a080000000000000004181826000000000000000000000013000000'

        send_packet(ip_address, port_number, read_val)
        print(datetime.datetime.now())
        print("Module Names:", module_names)
        print("Project Titles:", project_titles)

if __name__ == "__main__":
    main()

강화

import socket
import datetime
import re

def send_packet(ip, port, hex_dump):
    """주어진 IP와 포트에 hex 데이터를 전송하고 응답을 처리합니다."""
    global results

    packet = bytes.fromhex(hex_dump[108:])

    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.connect((ip, port))
            sock.sendall(packet)
            print("Packet sent successfully!")

            sock.settimeout(2.0)
            data = sock.recv(1024)
            print(f"[+] Received Packet (Hex): {data.hex()}")

            dump_val = data.hex().replace("00", "")
            print(f"[2+] Cleaned Packet (Hex): {dump_val}")

            remove_val = re.findall('d1(.+?)20202020202020', dump_val)
            if remove_val:
                dump_val = dump_val.replace(remove_val[0], "")
            print(f"[3+] Removed Data (Hex): {dump_val}")

            module_val = re.findall('2020(.+?)515047', dump_val)
            module_val2 = re.findall('515047(.+?)2020', dump_val)

            print("Module and Title Extraction:")

            del_set = ['20', '00']
            module_names_hex = [i for i in module_val if i not in del_set]
            project_titles_hex = [i for i in module_val2 if i not in del_set]

            print(f"Module Names (Hex): {module_names_hex}")
            print(f"Project Titles (Hex): {project_titles_hex}")

            module_names_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in module_names_hex]
            project_titles_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in project_titles_hex]

            results.append({
                'ip': ip,
                'module_names': module_names_str,
                'project_titles': project_titles_str
            })

    except Exception as e:
        print(f"Error occurred: {e}")

def main():
    """IP 주소 목록을 입력받아 각 IP 주소에 대해 패킷을 전송하고 결과를 처리합니다."""
    global results

    ip_list = ["10.88.42.124"]  # 테스트용 IP 리스트
    results = []

    for ip_address1 in ip_list:
        ip_address = input("Type IP address: ")
        port_number = 5002

        ethernet_N_N_1 = int(input("1. Ethernet network: "))
        ethernet_N_N = f"{ethernet_N_N_1:02x}"

        ethernet_S_N_1 = int(input("2. Ethernet station number: "))
        ethernet_S_N = f"{ethernet_S_N_1:02x}"

        ethernet_S_N_2 = int(input("3. Ethernet Module number: "))
        CPU_S_N2 = f"{ethernet_S_N_2:02x}"

        read_val = f'00d8617fb7985ca6e68d56fb08004500005b{ethernet_S_N}2140008006f9950a5e59b40a582a7c6fb4138a192a0755de60748d5018fbe354620000510124000011110700{ethernet_N_N}{CPU_S_N2}ff03{ethernet_N_N}{ethernet_S_N}fe0300001e001c080a080000000000000004181826000000000000000000000013000000'

        send_packet(ip_address, port_number, read_val)
        print(f"Time: {datetime.datetime.now()}")

    # 결과 출력
    for result in results:
        print(f"\nIP: {result['ip']}")
        print(f"Module Names: {result['module_names']}")
        print(f"Project Titles: {result['project_titles']}")

if __name__ == "__main__":
    main()

 

    • 첫 번째 코드는 PLC 장비의 기본적인 정보를 수집하고, 네트워크 탐색 기능을 제공합니다.
      • 두 번째 코드는 PLC 장비의 상세 정보를 수집하고, 데이터 분석 기능을 강화했습니다.

통신 확인

import socket

def check_plc_connection(ip_address, port=5001):
    """Q 시리즈 PLC에 패킷 통신이 가능한지 확인합니다."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(2)  # 2초 타임아웃 설정
            s.connect((ip_address, port))
            print(f"{ip_address}:{port}에 연결 성공!")
            # 간단한 테스트 패킷 전송 (필요에 따라 변경)
            s.send(b'\x00\x0A\x00\x00\x00\x00')
            data = s.recv(1024)
            print(f"응답: {data}")
            return True
    except socket.timeout:
        print(f"{ip_address}:{port}에서 응답이 없습니다.")
        return False
    except ConnectionRefusedError:
        print(f"{ip_address}:{port}에 연결할 수 없습니다. 연결이 거부되었습니다.")
        return False
    except Exception as e:
        print(f"오류 발생: {e}")
        return False

# 테스트
plc_ip = input("PLC IP 주소를 입력하세요: ")
if check_plc_connection(plc_ip):
    print("PLC와 통신 가능합니다.")
else:
    print("PLC와 통신할 수 없습니다.")