import socket
import struct
def read_r_series_data(ip, port, device_code, device_number, read_length):
"""R 시리즈 PLC에서 데이터를 읽어오는 함수"""
# SLMP 프레임 생성
command = b'\x04\x01' # 워드 데이터 읽기 명령어
sub_command = b'\x00\x00'
device_code_bytes = device_code.encode('ascii')
device_number_bytes = struct.pack('<H', device_number)
read_length_bytes = struct.pack('<H', read_length)
data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data
# 소켓 통신
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
sock.sendall(frame)
response = sock.recv(1024)
sock.close()
# 응답 데이터 처리
if response[11] == 0x00: # 정상 응답 확인
data_start = 18 # 데이터 시작 위치
read_data = struct.unpack('<' + 'H' * read_length, response[data_start:data_start + read_length * 2])
return read_data
else:
return None # 오류 발생
# 사용 예시
ip_address = '192.168.1.10'
port_number = 5006
device_code = 'D' # jk 레지스터
device_number = 100
read_length = 10
while True:
print("\n기능 선택:")
print("1. 데이터 읽기")
print("2. 데이터 쓰기")
print("3. PLC 시작")
print("4. PLC 스캔")
print("5. PLC 정지")
print("0. 종료")
choice = input("선택: ")
if choice == '1':
read_data = read_r_series_data(ip_address, port_number, device_code, device_number, read_length)
if read_data:
print(f"읽은 데이터: {read_data}")
else:
print("데이터 읽기 실패")
elif choice == '2':
write_data = tuple(map(int, input("쓸 데이터 (쉼표로 구분): ").split(',')))
if write_r_series_data(ip_address, port_number, device_code, device_number, write_data):
print("데이터 쓰기 성공")
else:
print("데이터 쓰기 실패")
elif choice == '3':
if plc_control(ip_address, port_number, 'start'):
print("PLC 시작 성공")
else:
print("PLC 시작 실패")
time.sleep(1)
elif choice == '4':
if plc_control(ip_address, port_number, 'scan'):
print("PLC 스캔 성공")
else:
print("PLC 스캔 실패")
time.sleep(1)
elif choice == '5':
if plc_control(ip_address, port_number, 'stop'):
print("PLC 정지 성공")
else:
print("PLC 정지 실패")
elif choice == '0':
break
else:
print("잘못된 선택입니다.")
test
import socket
import struct
import time
def read_r_series_data(ip, port, device_code, device_number, read_length):
"""R 시리즈 PLC에서 데이터를 읽어오는 함수 (오류 처리 강화)"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2) # 2초 타임아웃 설정
sock.connect((ip, port))
# SLMP 프레임 생성
command = b'\x04\x01' # 워드 데이터 읽기 명령어
sub_command = b'\x00\x00'
device_code_bytes = device_code.encode('ascii')
device_number_bytes = struct.pack('<H', device_number)
read_length_bytes = struct.pack('<H', read_length)
data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data
sock.sendall(frame)
response = sock.recv(1024)
sock.close()
if response[11] == 0x00: # 정상 응답 확인
data_start = 18 # 데이터 시작 위치
read_data = struct.unpack('<' + 'H' * read_length, response[data_start:data_start + read_length * 2])
return read_data
else:
print(f"PLC 오류 응답: {response[11]}") # jk오류 코드 출력
return None
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"통신 오류: {e}")
return None
def write_r_series_data(ip, port, device_code, device_number, write_data):
"""R 시리즈 PLC에 데이터를 쓰는 함수 (오류 처리 강화)"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((ip, port))
# SLMP 프레임 생성
command = b'\x14\x01' # 워드 데이터 쓰기 명령어
sub_command = b'\x00\x00'
device_code_bytes = device_code.encode('ascii')
device_number_bytes = struct.pack('<H', device_number)
write_length = len(write_data)
write_length_bytes = struct.pack('<H', write_length)
data = command + sub_command + device_code_bytes + device_number_bytes + write_length_bytes + struct.pack('<' + 'H' * write_length, *write_data)
frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data
sock.sendall(frame)
response = sock.recv(1024)
sock.close()
if response[11] == 0x00: # 정상 응답 확인
return True
else:
print(f"PLC 쓰기 오류 응답: {response[11]}")
return False
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"통신 오류: {e}")
return False
def plc_control(ip, port, control_code):
"""PLC 시작, 정지, 스캔 제어 함수 (오류 처리 강화)"""
control_codes = {
'start': b'\x0A\x01\x00\x00\x01\x00', # 시작
'stop': b'\x0A\x01\x00\x00\x02\x00', # 정지
'scan': b'\x0A\x01\x00\x00\x05\x00' # 스캔
}
if control_code not in control_codes:
print("잘못된 제어 코드입니다.")
return False
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((ip, port))
# SLMP 프레임 생성
data = control_codes[control_code]
frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data
sock.sendall(frame)
response = sock.recv(1024)
sock.close()
if response[11] == 0x00: # 정상 응답 확인
return True
else:
print(f"PLC 제어 오류 응답: {response[11]}")
return False
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"통신 오류: {e}")
return False
# 사용 예시
ip_address = '192.168.1.10'
port_number = 5006
device_code = 'D' # D 레지스터
device_number = 100
read_length = 10
while True:
print("\n기능 선택:")
print("1. 데이터 읽기")
print("2. 데이터 쓰기")
print("3. PLC 시작")
print("4. PLC 스캔")
print("5. PLC 정지")
print("0. 종료")
choice = input("선택: ")
if choice == '1':
read_data = read_r_series_data(ip_address, port_number, device_code, device_number, read_length)
if read_data:
print(f"읽은 데이터: {read_data}")
else:
print("데이터 읽기 실패")
elif choice == '2':
try:
write_data = tuple(map(int, input("쓸 데이터 (쉼표로 구분): ").split(',')))
if write_r_series_data(ip_address, port_number, device_code, device_number, write_data):
print("데이터 쓰기 성공")
else:
print("데이터 쓰기 실패")
except ValueError:
print("잘못된 데이터 형식입니다. 정수 값을 쉼표로 구분하여 입력하세요.")
elif choice == '3':
if plc_control(ip_address, port_number, 'start'):
print("PLC 시작 성공")
else:
print("PLC 시작 실패")
time.sleep(1)
elif choice == '4':
if plc_control(ip_address, port_number, 'scan'):
print("PLC 스캔 성공")
else:
print("PLC 스캔 실패")
time.sleep(1)
elif choice == '5':
if plc_control(ip_address, port_number, 'stop'):
print("PLC 정지 성공")
else:
print("PLC 정지 실패")
elif choice == '0':
break
else:
print("잘못된 선택입니다.")
packet 통신 확인 코드
import socket
import struct
def check_plc_connection(ip, port):
"""R 시리즈 PLC 연결 확인 함수"""
try:
# 소켓 생성 및 연결
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2) # 2초 타임아웃 설정
sock.connect((ip, port))
# SLMP 프레임 생성 (간단한 읽기 명령어)
command = b'\x04\x01' # 워드 데이터 읽기 명령어
sub_command = b'\x00\x00'
device_code_bytes = b'D' # D 레지스터
device_number_bytes = struct.pack('<H', 0) # D0
read_length_bytes = struct.pack('<H', 1) # 1 워드 읽기
data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data
# 프레임 전송 및 응답 수신
sock.sendall(frame)
response = sock.recv(1024)
# 응답 확인
if response and response[11] == 0x00:
print("PLC 연결 성공 및 응답 수신!")
return True
else:
print("PLC 연결 성공, 응답 오류!")
return False
except socket.timeout:
print("PLC 연결 시간 초과!")
return False
except ConnectionRefusedError:
print("PLC 연결 거부됨!")
return False
except Exception as e:
print(f"PLC 연결 중 오류 발생: {e}")
return False
finally:
if 'sock' in locals():
sock.close()
# 사용 예시
plc_ip = '192.168.1.10' # PLC IP 주소
plc_port = 5006 # PLC 포트 번호
if check_plc_connection(plc_ip, plc_port):
print("PLC와 패킷 통신 가능!")
else:
print("PLC와 패킷 통신 불가능!")
'web_security' 카테고리의 다른 글
ntds.dit (1) | 2024.08.30 |
---|---|
국산 토종 종갓집 웹쉘 (1) | 2023.04.18 |
sql injection payloads (0) | 2021.07.20 |
robots.txt 란? (0) | 2020.02.15 |