import socket
import re
import datetime
def send_packet(ip, port, hex_dump, network_num, station_num, cpu_num):
"""
주어진 IP와 포트에 hex 데이터를 전송하고 응답을 처리합니다.
Args:
ip (str): 대상 서버의 IP 주소.
port (int): 대상 서버의 포트 번호.
hex_dump (str): 전송할 hex 데이터 문자열.
network_num (str): 네트워크 번호 (hex).
station_num (str): 스테이션 번호 (hex).
cpu_num (str): CPU 번호 (hex).
Returns:
int: 성공 시 1, 실패 시 0.
"""
packet = bytes.fromhex(hex_dump[108:])
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((ip, port))
sock.sendall(packet)
print("Packet sent successfully!")
sock.settimeout(2.0)
data = sock.recv(1024)
print("-----------------------")
print(data)
print("-----------------------")
dump_val = data.hex()
try:
re_val = re.search('010101000000(.+?)2020202020', dump_val).group(1)
result_string = ''.join(chr(int(re_val[i:i+2], 16)) for i in range(0, len(re_val), 2))
except Exception as e:
print(f"Error parsing response: {e}")
result_string = "Try Again"
print(f"[+] Received Packet: {data.hex()}")
with open(f'./Excel_Sc/{ip}', 'a') as f:
f.write(f"1. Ethernet network : {int(network_num, 16)}\n")
f.write(f"2. Station Number : {int(station_num, 16)}\n")
f.write(f"3. Ethernet station number : {int(cpu_num, 16)}\n")
f.write(f"4. CPU Type : {result_string}\n")
print("+++++++++++++++++++++++++++++++")
return 1
except Exception as e:
print(f"Error occurred: {e}")
return 0
def main():
"""
IP 목록을 읽어 각 IP에 대해 패킷을 전송하고 결과를 기록합니다.
"""
try:
with open("./iplist.txt", "r") as f:
iplist = [line.strip() for line in f]
except FileNotFoundError:
print("iplist.txt 파일을 찾을 수 없습니다.")
return
port_number = 5002
for ip_address in iplist:
print(f"Processing IP: {ip_address}")
with open('./ip_log', 'a') as f:
f.write(f"{ip_address} : {datetime.datetime.now()}\n")
for network_num_int in range(1, 35):
for station_num_int in range(20, 70):
for cpu_num_int in range(1, 20):
network_num = f"{network_num_int:02x}"
station_num = f"{station_num_int:02x}"
cpu_num = f"{cpu_num_int:02x}"
print("---------------------")
print(f"Ethernet Network: {network_num_int}")
print(f"Ethernet Serial Number: {station_num_int}")
print(f"CPU Serial Number: {cpu_num_int}")
hex_dump = f'00d8617fb7985ca6e68d56fb080045000051{station_num}a640008006f67b0a5e59b40a582a1b2f56138a6079996f43dd1feb5018faf0bd470000510100000011110700{network_num}{cpu_num}ff03{network_num}{station_num}fe03000014001c080a0800000000000000040101010000000001'
print("---------------------")
if send_packet(ip_address, port_number, hex_dump, network_num, station_num, cpu_num) == 1:
break
if send_packet(ip_address, port_number, hex_dump, network_num, station_num, cpu_num) == 1:
break
if send_packet(ip_address, port_number, hex_dump, network_num, station_num, cpu_num) == 1:
break
print(datetime.datetime.now())
if __name__ == "__main__":
main()
ㅅㄷㄴㅅ
import socket
import datetime
import re
def send_packet(ip, port, hex_dump):
"""
주어진 IP와 포트에 hex 데이터를 전송하고 응답을 처리합니다.
"""
global module_names, project_titles
packet = bytes.fromhex(hex_dump[108:])
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((ip, port))
sock.sendall(packet)
print("Packet sent successfully!")
sock.settimeout(2.0)
data = sock.recv(1024)
print(f"[+] Received Packet: {data.hex()}")
dump_val = data.hex().replace("00", "")
print(f"[2+] Received Packet (cleaned): {dump_val}")
remove_val = re.findall('d1(.+?)20202020202020', dump_val)
if remove_val:
dump_val = dump_val.replace(remove_val[0], "")
print(f"[3+] Received Packet (removed): {dump_val}")
module_val = re.findall('2020(.+?)515047', dump_val)
module_val2 = re.findall('515047(.+?)2020', dump_val)
print("Module and Title extraction:")
del_set = ['20', '00']
module_names_hex = [i for i in module_val if i not in del_set]
project_titles_hex = [i for i in module_val2 if i not in del_set]
print(f"Module names (hex): {module_names_hex}")
print(f"Project titles (hex): {project_titles_hex}")
module_names_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in module_names_hex]
project_titles_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in project_titles_hex]
module_names.extend(module_names_str)
project_titles.extend(project_titles_str)
except Exception as e:
print(f"Error occurred: {e}")
def main():
"""
IP 주소 목록을 입력받아 각 IP 주소에 대해 패킷을 전송하고 결과를 처리합니다.
"""
global module_names, project_titles
ip_list = ["10.88.42.124"] #테스트용 ip리스트
module_names = []
project_titles = []
for ip_address1 in ip_list:
ip_address = input("Type IP address: ")
port_number = 5002
ethernet_N_N_1 = int(input("1. Ethernet network: "))
ethernet_N_N = hex(ethernet_N_N_1).replace("0x", "").zfill(2)
ethernet_S_N_1 = int(input("2. Ethernet station number: "))
ethernet_S_N = hex(ethernet_S_N_1).replace("0x", "").zfill(2)
ethernet_S_N_2 = int(input("3. Ethernet Module number: "))
CPU_S_N2 = hex(ethernet_S_N_2).replace("0x", "").zfill(2)
read_val = f'00d8617fb7985ca6e68d56fb08004500005b{ethernet_S_N}2140008006f9950a5e59b40a582a7c6fb4138a192a0755de60748d5018fbe354620000510124000011110700{ethernet_N_N}{CPU_S_N2}ff03{ethernet_N_N}{ethernet_S_N}fe0300001e001c080a080000000000000004181826000000000000000000000013000000'
send_packet(ip_address, port_number, read_val)
print(datetime.datetime.now())
print("Module Names:", module_names)
print("Project Titles:", project_titles)
if __name__ == "__main__":
main()
강화
import socket
import datetime
import re
def send_packet(ip, port, hex_dump):
"""주어진 IP와 포트에 hex 데이터를 전송하고 응답을 처리합니다."""
global results
packet = bytes.fromhex(hex_dump[108:])
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((ip, port))
sock.sendall(packet)
print("Packet sent successfully!")
sock.settimeout(2.0)
data = sock.recv(1024)
print(f"[+] Received Packet (Hex): {data.hex()}")
dump_val = data.hex().replace("00", "")
print(f"[2+] Cleaned Packet (Hex): {dump_val}")
remove_val = re.findall('d1(.+?)20202020202020', dump_val)
if remove_val:
dump_val = dump_val.replace(remove_val[0], "")
print(f"[3+] Removed Data (Hex): {dump_val}")
module_val = re.findall('2020(.+?)515047', dump_val)
module_val2 = re.findall('515047(.+?)2020', dump_val)
print("Module and Title Extraction:")
del_set = ['20', '00']
module_names_hex = [i for i in module_val if i not in del_set]
project_titles_hex = [i for i in module_val2 if i not in del_set]
print(f"Module Names (Hex): {module_names_hex}")
print(f"Project Titles (Hex): {project_titles_hex}")
module_names_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in module_names_hex]
project_titles_str = [''.join([chr(int(val[i:i+2], 16)) for i in range(0, len(val), 2)]).replace(" ", "") for val in project_titles_hex]
results.append({
'ip': ip,
'module_names': module_names_str,
'project_titles': project_titles_str
})
except Exception as e:
print(f"Error occurred: {e}")
def main():
"""IP 주소 목록을 입력받아 각 IP 주소에 대해 패킷을 전송하고 결과를 처리합니다."""
global results
ip_list = ["10.88.42.124"] # 테스트용 IP 리스트
results = []
for ip_address1 in ip_list:
ip_address = input("Type IP address: ")
port_number = 5002
ethernet_N_N_1 = int(input("1. Ethernet network: "))
ethernet_N_N = f"{ethernet_N_N_1:02x}"
ethernet_S_N_1 = int(input("2. Ethernet station number: "))
ethernet_S_N = f"{ethernet_S_N_1:02x}"
ethernet_S_N_2 = int(input("3. Ethernet Module number: "))
CPU_S_N2 = f"{ethernet_S_N_2:02x}"
read_val = f'00d8617fb7985ca6e68d56fb08004500005b{ethernet_S_N}2140008006f9950a5e59b40a582a7c6fb4138a192a0755de60748d5018fbe354620000510124000011110700{ethernet_N_N}{CPU_S_N2}ff03{ethernet_N_N}{ethernet_S_N}fe0300001e001c080a080000000000000004181826000000000000000000000013000000'
send_packet(ip_address, port_number, read_val)
print(f"Time: {datetime.datetime.now()}")
# 결과 출력
for result in results:
print(f"\nIP: {result['ip']}")
print(f"Module Names: {result['module_names']}")
print(f"Project Titles: {result['project_titles']}")
if __name__ == "__main__":
main()
-
- 첫 번째 코드는 PLC 장비의 기본적인 정보를 수집하고, 네트워크 탐색 기능을 제공합니다.
-
- 두 번째 코드는 PLC 장비의 상세 정보를 수집하고, 데이터 분석 기능을 강화했습니다.
통신 확인
import socket
def check_plc_connection(ip_address, port=5001):
"""Q 시리즈 PLC에 패킷 통신이 가능한지 확인합니다."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(2) # 2초 타임아웃 설정
s.connect((ip_address, port))
print(f"{ip_address}:{port}에 연결 성공!")
# 간단한 테스트 패킷 전송 (필요에 따라 변경)
s.send(b'\x00\x0A\x00\x00\x00\x00')
data = s.recv(1024)
print(f"응답: {data}")
return True
except socket.timeout:
print(f"{ip_address}:{port}에서 응답이 없습니다.")
return False
except ConnectionRefusedError:
print(f"{ip_address}:{port}에 연결할 수 없습니다. 연결이 거부되었습니다.")
return False
except Exception as e:
print(f"오류 발생: {e}")
return False
# 테스트
plc_ip = input("PLC IP 주소를 입력하세요: ")
if check_plc_connection(plc_ip):
print("PLC와 통신 가능합니다.")
else:
print("PLC와 통신할 수 없습니다.")